2014-10-18 51 views
1

我正在编写一个Ruby应用程序(Linux x86_64中的Ruby v2.1.3p242),它将重复处理在线数据并将结果存储在数据库中。为了加快速度,我有多个线程同时运行,我一直在努力在命令和线程引发异常时干净地停止所有线程。Ruby - 使用互斥锁阻止线程过早停止

问题是在调用Sever.stop之后,某些线程将继续运行#do_stuff的多次迭代。他们最终会停止,但在其他人停止之后,我会看到一些线程运行10到50次。

每个线程的互斥锁在每次迭代之前都被锁定,然后解锁。当调用Server.stop时,代码@mutex.synchronize { kill }在每个线程上运行。这应该在下一次迭代后马上杀死线程,但这似乎并不是这种情况。

编辑:

代码工作的,就是可以随意,如果你想测试它。在我的测试中,在调用Server.stop之后,所有线程都会停止30秒到几分钟。请注意,每次迭代需要1-3秒。我用下面的测试代码(使用ruby -I.而在同一目录中):

require 'benchmark' 
require 'server' 

s = Server.new 
s.start 
puts Benchmark.measure { s.stop } 

下面是代码:

server.rb:

require 'server/fetcher_thread' 

class Server 
    THREADS = 8 

    attr_reader :threads 
    def initialize 
    @threads = [] 
    end 

    def start 
    create_threads 
    end 

    def stop 
    @threads.map {|t| Thread.new { t.stop } }.each(&:join) 
    @threads = [] 
    end 

    private 

    def create_threads 
    THREADS.times do |i| 
     @threads << FetcherThread.new(number: i + 1) 
    end 
    end 
end 

server/fetcher_thread.rb:

class Server 
    class FetcherThread < Thread 
    attr_reader :mutex 

    def initialize(opts = {}) 
     @mutex = Mutex.new 
     @number = opts[:number] || 0 

     super do  
     loop do 
      @mutex.synchronize { do_stuff } 
     end 
     end 
    end 

    def stop 
     @mutex.synchronize { kill } 
    end 

    private 

    def do_stuff 
     debug "Sleeping for #{time_to_sleep = rand * 2 + 1} seconds" 
     sleep time_to_sleep 
    end 

    def debug(message) 
     $stderr.print "Thread ##{@number}: #{message}\n" 
    end 
    end 
end 
+0

你能削减你的问题降低到一个可以在纯Ruby上的任何外部系统进行复制,而不依赖代码中的互斥? – Phrogz 2014-10-18 03:38:13

+1

我更改了代码,以便它在每次迭代中运行'do_stuff'。这只是显示一条消息,并休眠1-3秒。该代码的工作原理和演示的问题,所以随时测试它。 – hololeap 2014-10-18 04:16:09

+0

谢谢。这应该有助于你获得帮助。不过,如果所有的代码真正有必要从本质上重现您的问题,我会感到惊讶。例如,是否需要ActiveSupport? – Phrogz 2014-10-18 05:59:14

回答

1

不能保证调用stop的线程将在循环的下一次迭代之前获取互斥锁。这完全取决于Ruby和操作系统调度程序,某些操作系统(including Linux)不实现FIFO调度算法,但考虑其他因素以优化性能。

您可以通过避免kill并使用变量干净地退出循环来使其更具可预测性。然后,你只需要环绕访问变量

class Server 
    class FetcherThread < Thread 
    attr_reader :mutex 

    def initialize(opts = {}) 
     @mutex = Mutex.new 
     @number = opts[:number] || 0 

     super do  
     until stopped? 
      do_stuff 
     end 
     end 
    end 

    def stop 
     mutex.synchronize { @stop = true } 
    end 

    def stopped? 
     mutex.synchronize { @stop } 
    end 

    #... 
    end 
end