我很想知道什么是实现基于线程队列的最佳方式。线程和队列
例如:
我有,我想只有4线程执行10分的动作。我想创建一个队列,其中包含线性放置的所有10个动作,并使用4个线程启动前4个动作,一旦一个线程完成执行,下一个将启动等 - 所以一次,线程数目为4或小于4.
我很想知道什么是实现基于线程队列的最佳方式。线程和队列
例如:
我有,我想只有4线程执行10分的动作。我想创建一个队列,其中包含线性放置的所有10个动作,并使用4个线程启动前4个动作,一旦一个线程完成执行,下一个将启动等 - 所以一次,线程数目为4或小于4.
您可以使用线程池。对于这类问题来说,这是一个相当常见的模式。
http://en.wikipedia.org/wiki/Thread_pool_pattern
Github上似乎有几个实现,你可以尝试一下:
https://github.com/search?type=Everything&language=Ruby&q=thread+pool
有一个Queue
类thread
标准库。使用你可以做这样的事情:
require 'thread'
queue = Queue.new
threads = []
# add work to the queue
queue << work_unit
4.times do
threads << Thread.new do
# loop until there are no more things to do
until queue.empty?
# pop with the non-blocking flag set, this raises
# an exception if the queue is empty, in which case
# work_unit will be set to nil
work_unit = queue.pop(true) rescue nil
if work_unit
# do work
end
end
# when there is no more work, the thread will stop
end
end
# wait until all threads have completed processing
threads.each { |t| t.join }
我与非阻塞标志弹出的原因是until queue.empty?
和流行之间的另一个线程可能已经pop'ed队列,所以除非无阻塞国旗已设置,我们可以永远卡在那条线上。
如果您使用MRI,默认的Ruby解释器,请记住线程不会绝对并发。如果你的工作是CPU限制的,你也可以运行单线程。如果你有一些阻塞IO的操作,你可能会得到一些并行性,但是YMMV。或者,您可以使用允许完全并发的解释器,如jRuby或Rubinius。
那里有几块宝石可以为你实现这个模式;平行,桃子和我的名字叫做threach
(或者jruby下的jruby_threach
)。它是#each的替代替代品,但允许您指定要运行多少个线程,使用底下的SizedQueue来防止螺旋失控。
所以......
(1..10).threach(4) {|i| do_my_work(i) }
不推我自己的东西;有很多很好的实现可以让事情变得更简单。
如果您使用的是JRuby,jruby_threach
是一个更好的实现 - Java只是提供了更丰富的线程主数据和数据结构来使用。
可执行描述性示例:
require 'thread'
p tasks = [
{:file => 'task1'},
{:file => 'task2'},
{:file => 'task3'},
{:file => 'task4'},
{:file => 'task5'}
]
tasks_queue = Queue.new
tasks.each {|task| tasks_queue << task}
# run workers
workers_count = 3
workers = []
workers_count.times do |n|
workers << Thread.new(n+1) do |my_n|
while (task = tasks_queue.shift(true) rescue nil) do
delay = rand(0)
sleep delay
task[:result] = "done by worker ##{my_n} (in #{delay})"
p task
end
end
end
# wait for all threads
workers.each(&:join)
# output results
puts "all done"
p tasks
Celluloid具有执行此。
我使用了一个名为work_queue的宝石。它非常实用。
实施例:
require 'work_queue'
wq = WorkQueue.new 4, 10
(1..10).each do |number|
wq.enqueue_b("Thread#{number}") do |thread_name|
puts "Hello from the #{thread_name}"
end
end
wq.join
在镐,它表明具有4':END_OF_WORK''work_unit's代替具有无阻塞弹出。另外,关于没有CPU的线程同时运行的最后一条语句适用于YARV,但不适用于JRuby。 –
@AndrewGrimm,我喜欢这个答案,因为有时候你想要一个工作队列和线程在任何时候添加一个新的工作项目时都会继续工作。 – akostadinov