2013-07-30 36 views
0

我为我的Ruby on Rails应用程序使用AMQP/RabbitMQ。带有PhusionPassenger/Rails的AMQP/RabbitMQ-Server/EventMachine只能接收第一条消息

我把下面的amqp.rb文件在配置/初始化:(复制和配方改变:http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs

require 'amqp' 

# References: 
# 1. Getting Started with AMQP and Ruby 
#  http://rubyamqp.info/articles/getting_started/ 
# 2. EventMachine and Rails 
#  http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs 
# 3. Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra 
#  http://rubyamqp.info/articles/connecting_to_broker/ 
module AppEventMachine 
    def self.start 
    if defined?(PhusionPassenger) 
     Rails.logger.info "###############################################################################" 
     Rails.logger.info "Running EventMachine/Rails with PhusionPassenger ......" 
     Rails.logger.info "###############################################################################" 
     PhusionPassenger.on_event(:starting_worker_process) do |forked| 
     # => for passenger, we need to avoid orphaned threads 
     if forked && EventMachine.reactor_running? 
      EventMachine.stop 
     end 

     spawn_eventmachine_thread 
     die_gracefully_on_signal 
     end 
    else 
     Rails.logger.info "###############################################################################" 
     Rails.logger.info "PhusionPassenger is not running. Probably you are running Rails locally ......" 
     Rails.logger.info "###############################################################################" 

     # faciliates debugging 
     Thread.abort_on_exception = true 
     # just spawn a thread and start it up 
     spawn_eventmachine_thread unless defined?(Thin) 
     # Thin is built on EventMachine, doesn't need this thread 
    end 
    end 

    def self.spawn_eventmachine_thread 
    Thread.new { 
     EventMachine.run do 
     AMQP.channel ||= AMQP::Channel.new(AMQP.connect(:host => '127.0.0.1')) # Q_SERVER, :user=> Q_USER, :pass => Q_PASS, :vhost => Q_VHOST)) 
     AMQP.channel.on_error(&method(:handle_channel_exception)) 
     AMQP.channel.queue(MixpanelJob::QUEUE_NAME, :exclusive => true) 
        .subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) } 
     end 
    } 
    end 

    def self.handle_channel_exception(channel, channel_close) 
    Rails.logger.error "###############################################################################" 
    Rails.logger.error "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}" 
    Rails.logger.error "###############################################################################" 
    end 

    def self.die_gracefully_on_signal 
    Signal.trap("INT") { 
     Rails.logger.error "###############################################################################" 
     Rails.logger.error "Stopping the EventMachine ......" 
     EventMachine.stop 
     Rails.logger.error "###############################################################################" 
    } 
    Signal.trap("TERM") { 
     Rails.logger.error "###############################################################################" 
     Rails.logger.error "Stopping the EventMachine ......" 
     EventMachine.stop 
     Rails.logger.error "###############################################################################" 
    } 
    end 
end 

AppEventMachine.start 

后,我开始与Rails的PhusionPassenger,我看到了它与PhusionPassenger运行,并且然后,我尝试将消息发送到队列,但让我吃惊:

.subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) } 

subscribe处理器只执行一次,即只接收第一条消息,任何其他消息(第二,第三),在subscribe H安德勒永远不会被召唤。

+0

如果您的工作失败,可能会发生这种情况。尝试用“puts payload”替换MixpanelJob :: handle_sending(元数据,有效载荷)并查看它是否有效,然后 – Joshua

+0

哦,是的,你说得对。看来我的'MixpanelJob :: handle_sending'失败了。我需要处理。嗯,奇怪的是'Mixpanel'从不会引发错误。 –

回答

0

感谢Joshua的评论。事实证明,MixpanelJob::handl_sending失败,并且它阻塞了EventMachine线程。

相关问题