我正在编写一个Ruby应用程序(Linux x86_64中的Ruby v2.1.3p242),它将重复处理在线数据并将结果存储在数据库中。为了加快速度,我有多个线程同时运行,我一直在努力在命令和线程引发异常时干净地停止所有线程。Ruby - 使用互斥锁阻止线程过早停止
问题是在调用Sever.stop
之后,某些线程将继续运行#do_stuff
的多次迭代。他们最终会停止,但在其他人停止之后,我会看到一些线程运行10到50次。
每个线程的互斥锁在每次迭代之前都被锁定,然后解锁。当调用Server.stop
时,代码@mutex.synchronize { kill }
在每个线程上运行。这应该在下一次迭代后马上杀死线程,但这似乎并不是这种情况。
编辑:
代码工作的,就是可以随意,如果你想测试它。在我的测试中,在调用Server.stop
之后,所有线程都会停止30秒到几分钟。请注意,每次迭代需要1-3秒。我用下面的测试代码(使用ruby -I.
而在同一目录中):
require 'benchmark'
require 'server'
s = Server.new
s.start
puts Benchmark.measure { s.stop }
下面是代码:
server.rb:
require 'server/fetcher_thread'
class Server
THREADS = 8
attr_reader :threads
def initialize
@threads = []
end
def start
create_threads
end
def stop
@threads.map {|t| Thread.new { t.stop } }.each(&:join)
@threads = []
end
private
def create_threads
THREADS.times do |i|
@threads << FetcherThread.new(number: i + 1)
end
end
end
server/fetcher_thread.rb:
class Server
class FetcherThread < Thread
attr_reader :mutex
def initialize(opts = {})
@mutex = Mutex.new
@number = opts[:number] || 0
super do
loop do
@mutex.synchronize { do_stuff }
end
end
end
def stop
@mutex.synchronize { kill }
end
private
def do_stuff
debug "Sleeping for #{time_to_sleep = rand * 2 + 1} seconds"
sleep time_to_sleep
end
def debug(message)
$stderr.print "Thread ##{@number}: #{message}\n"
end
end
end
你能削减你的问题降低到一个可以在纯Ruby上的任何外部系统进行复制,而不依赖代码中的互斥? – Phrogz 2014-10-18 03:38:13
我更改了代码,以便它在每次迭代中运行'do_stuff'。这只是显示一条消息,并休眠1-3秒。该代码的工作原理和演示的问题,所以随时测试它。 – hololeap 2014-10-18 04:16:09
谢谢。这应该有助于你获得帮助。不过,如果所有的代码真正有必要从本质上重现您的问题,我会感到惊讶。例如,是否需要ActiveSupport? – Phrogz 2014-10-18 05:59:14