2011-07-02 102 views
25

我很想知道什么是实现基于线程队列的最佳方式。线程和队列

例如:

我有,我想只有4线程执行10分的动作。我想创建一个队列,其中包含线性放置的所有10个动作,并使用4个线程启动前4个动作,一旦一个线程完成执行,下一个将启动等 - 所以一次,线程数目为4或小于4.

回答

28

有一个Queuethread标准库。使用你可以做这样的事情:

require 'thread' 

queue = Queue.new 
threads = [] 

# add work to the queue 
queue << work_unit 

4.times do 
    threads << Thread.new do 
    # loop until there are no more things to do 
    until queue.empty? 
     # pop with the non-blocking flag set, this raises 
     # an exception if the queue is empty, in which case 
     # work_unit will be set to nil 
     work_unit = queue.pop(true) rescue nil 
     if work_unit 
     # do work 
     end 
    end 
    # when there is no more work, the thread will stop 
    end 
end 

# wait until all threads have completed processing 
threads.each { |t| t.join } 

我与非阻塞标志弹出的原因是until queue.empty?和流行之间的另一个线程可能已经pop'ed队列,所以除非无阻塞国旗已设置,我们可以永远卡在那条线上。

如果您使用MRI,默认的Ruby解释器,请记住线程不会绝对并发。如果你的工作是CPU限制的,你也可以运行单线程。如果你有一些阻塞IO的操作,你可能会得到一些并行性,但是YMMV。或者,您可以使用允许完全并发的解释器,如jRuby或Rubinius。

+1

在镐,它表明具有4':END_OF_WORK''work_unit's代替具有无阻塞弹出。另外,关于没有CPU的线程同时运行的最后一条语句适用于YARV,但不适用于JRuby。 –

+0

@AndrewGrimm,我喜欢这个答案,因为有时候你想要一个工作队列和线程在任何时候添加一个新的工作项目时都会继续工作。 – akostadinov

7

那里有几块宝石可以为你实现这个模式;平行,桃子和我的名字叫做threach(或者jruby下的jruby_threach)。它是#each的替代替代品,但允许您指定要运行多少个线程,使用底下的SizedQueue来防止螺旋失控。

所以......

(1..10).threach(4) {|i| do_my_work(i) } 

不推我自己的东西;有很多很好的实现可以让事情变得更简单。

如果您使用的是JRuby,jruby_threach是一个更好的实现 - Java只是提供了更丰富的线程主数据和数据结构来使用。

5

可执行描述性示例:

require 'thread' 

p tasks = [ 
    {:file => 'task1'}, 
    {:file => 'task2'}, 
    {:file => 'task3'}, 
    {:file => 'task4'}, 
    {:file => 'task5'} 
] 

tasks_queue = Queue.new 
tasks.each {|task| tasks_queue << task} 

# run workers 
workers_count = 3 
workers = [] 
workers_count.times do |n| 
    workers << Thread.new(n+1) do |my_n| 
     while (task = tasks_queue.shift(true) rescue nil) do 
      delay = rand(0) 
      sleep delay 
      task[:result] = "done by worker ##{my_n} (in #{delay})" 
      p task 
     end 
    end 
end 

# wait for all threads 
workers.each(&:join) 

# output results 
puts "all done" 
p tasks 
1

我使用了一个名为work_queue的宝石。它非常实用。

实施例:

require 'work_queue' 
wq = WorkQueue.new 4, 10 
(1..10).each do |number| 
    wq.enqueue_b("Thread#{number}") do |thread_name| 
     puts "Hello from the #{thread_name}" 
    end 
end 
wq.join