2014-03-27 58 views
2

我正在尝试使用多个处理器创建迭代Enumerables的通用方法。我正在使用fork产生给定数量的工人,并为他们提供数据以处理重复使用闲置的工人。但是,我想同步输入和输出顺序。如果作业1和作业2同时启动,作业2在作业1之前完成,则结果顺序不同步。我想以某种方式缓存输出以同步输出顺序,但是我没有看到如何实现这一点。Ruby:同步fork池输出

#!/usr/bin/env ruby 

require 'pp' 

DEBUG = false 
CPUS = 2 

module Enumerable 
    # Fork each (feach) creates a fork pool with a specified number of processes 
    # to iterate over the Enumerable object processing the specified block. 
    # Calling feach with :processes => 0 disables forking for debugging purposes. 
    # It is possible to disable synchronized output with :synchronize => false 
    # which will save some overhead. 
    # 
    # @example - process 10 elements using 4 processes: 
    # 
    # (0 ... 10).feach(:processes => 4) { |i| puts i; sleep 1 } 
    def feach(options = {}, &block) 
    $stderr.puts "Parent pid: #{Process.pid}" if DEBUG 

    procs = options[:processes] || 0 
    sync = options[:synchronize] || true 

    if procs > 0 
     workers = spawn_workers(procs, &block) 
     threads = [] 

     self.each_with_index do |elem, index| 
     $stderr.puts "elem: #{elem} index: #{index}" if DEBUG 

     threads << Thread.new do 
      worker = workers[index % procs] 
      worker.process(elem) 
     end 

     if threads.size == procs 
      threads.each { |thread| thread.join } 
      threads = [] 
     end 
     end 

     threads.each { |thread| thread.join } 
     workers.each { |worker| worker.terminate } 
    else 
     self.each do |elem| 
     block.call(elem) 
     end 
    end 
    end 

    def spawn_workers(procs, &block) 
    workers = [] 

    procs.times do 
     child_read, parent_write = IO.pipe 
     parent_read, child_write = IO.pipe 

     pid = Process.fork do 
     begin 
      parent_write.close 
      parent_read.close 
      call(child_read, child_write, &block) 
     ensure 
      child_read.close 
      child_write.close 
     end 
     end 

     child_read.close 
     child_write.close 

     $stderr.puts "Spawning worker with pid: #{pid}" if DEBUG 

     workers << Worker.new(parent_read, parent_write, pid) 
    end 

    workers 
    end 

    def call(child_read, child_write, &block) 
    while not child_read.eof? 
     elem = Marshal.load(child_read) 
     $stderr.puts "  call with Process.pid: #{Process.pid}" if DEBUG 
     result = block.call(elem) 
     Marshal.dump(result, child_write) 
    end 
    end 

    class Worker 
    attr_reader :parent_read, :parent_write, :pid 

    def initialize(parent_read, parent_write, pid) 
     @parent_read = parent_read 
     @parent_write = parent_write 
     @pid   = pid 
    end 

    def process(elem) 
     Marshal.dump(elem, @parent_write) 
     $stderr.puts " process with worker pid: #{@pid} and parent pid: #{Process.pid}" if DEBUG 
     Marshal.load(@parent_read) 
    end 

    def terminate 
     $stderr.puts "Terminating worker with pid: #{@pid}" if DEBUG 
     Process.wait(@pid, Process::WNOHANG) 
     @parent_read.close 
     @parent_write.close 
    end 
    end 
end 

def fib(n) n < 2 ? n : fib(n-1)+fib(n-2); end # Lousy Fibonacci calculator <- heavy job 

(0 ... 10).feach(processes: CPUS) { |i| puts "#{i}: #{fib(35)}" } 
+0

考虑让您的子进程写入数据库。您可以将足够的信息(可能是工作号码)传递给他们,然后将他们的结果与他们一起存储。清理脚本然后可以遍历表,使用'order by'子句按顺序检索结果。 –

+0

@theTinMan我想避免数据库或限制磁盘I/O的任何事情。我可以通过IO管道将消息/结果发送给儿童或从儿童发送消息 - 因此,我不是通过数据库来调用基于内存的简单缓存(Ruby对象),而是一种控制缓存而不是清理脚本的方法。 – maasha

回答

1

没有办法,除非强制所有子进程将它们的输出发送到家长和有它的结果,你执行某种进程之间的I/O锁定的排序,或同步输出。

不知道你的长期目标是什么,很难提出解决方案。一般来说,在每个过程中都需要大量工作才能使用fork来获得任何显着的加速,并且没有简单的方法将结果返回到主程序。

本机线程(Linux上的pthreads)可能更有意义完成您正在尝试执行的操作,但并非所有版本的Ruby都支持该级别的线程。请参阅:

Does ruby have real multithreading?

+0

我想通过阅读块来解析大文件,并将实际的解析委托给工作人员。我可以使用IO管道向工作人员发送块并将结果发送给父级。智能缓存和控制器可以检测出完成了哪些作业并按顺序输出,这应该是可行的 - 我想。 – maasha

+0

我对Ruby线程不感兴趣,因为它们不支持多处理器工作 - 至少在MRI中不支持。 – maasha

+0

看看jRuby。底层的Java有真正的线程。但是定义“巨大的文件”。而且,你是否研究过MQ体系结构。他们可以非常快速。 –