0
我为我的Ruby on Rails应用程序使用AMQP/RabbitMQ。带有PhusionPassenger/Rails的AMQP/RabbitMQ-Server/EventMachine只能接收第一条消息
我把下面的amqp.rb文件在配置/初始化:(复制和配方改变:http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs)
require 'amqp'
# References:
# 1. Getting Started with AMQP and Ruby
# http://rubyamqp.info/articles/getting_started/
# 2. EventMachine and Rails
# http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs
# 3. Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra
# http://rubyamqp.info/articles/connecting_to_broker/
module AppEventMachine
def self.start
if defined?(PhusionPassenger)
Rails.logger.info "###############################################################################"
Rails.logger.info "Running EventMachine/Rails with PhusionPassenger ......"
Rails.logger.info "###############################################################################"
PhusionPassenger.on_event(:starting_worker_process) do |forked|
# => for passenger, we need to avoid orphaned threads
if forked && EventMachine.reactor_running?
EventMachine.stop
end
spawn_eventmachine_thread
die_gracefully_on_signal
end
else
Rails.logger.info "###############################################################################"
Rails.logger.info "PhusionPassenger is not running. Probably you are running Rails locally ......"
Rails.logger.info "###############################################################################"
# faciliates debugging
Thread.abort_on_exception = true
# just spawn a thread and start it up
spawn_eventmachine_thread unless defined?(Thin)
# Thin is built on EventMachine, doesn't need this thread
end
end
def self.spawn_eventmachine_thread
Thread.new {
EventMachine.run do
AMQP.channel ||= AMQP::Channel.new(AMQP.connect(:host => '127.0.0.1')) # Q_SERVER, :user=> Q_USER, :pass => Q_PASS, :vhost => Q_VHOST))
AMQP.channel.on_error(&method(:handle_channel_exception))
AMQP.channel.queue(MixpanelJob::QUEUE_NAME, :exclusive => true)
.subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) }
end
}
end
def self.handle_channel_exception(channel, channel_close)
Rails.logger.error "###############################################################################"
Rails.logger.error "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
Rails.logger.error "###############################################################################"
end
def self.die_gracefully_on_signal
Signal.trap("INT") {
Rails.logger.error "###############################################################################"
Rails.logger.error "Stopping the EventMachine ......"
EventMachine.stop
Rails.logger.error "###############################################################################"
}
Signal.trap("TERM") {
Rails.logger.error "###############################################################################"
Rails.logger.error "Stopping the EventMachine ......"
EventMachine.stop
Rails.logger.error "###############################################################################"
}
end
end
AppEventMachine.start
后,我开始与Rails的PhusionPassenger,我看到了它与PhusionPassenger运行,并且然后,我尝试将消息发送到队列,但让我吃惊:
.subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) }
的subscribe
处理器只执行一次,即只接收第一条消息,任何其他消息(第二,第三),在subscribe
H安德勒永远不会被召唤。
如果您的工作失败,可能会发生这种情况。尝试用“puts payload”替换MixpanelJob :: handle_sending(元数据,有效载荷)并查看它是否有效,然后 – Joshua
哦,是的,你说得对。看来我的'MixpanelJob :: handle_sending'失败了。我需要处理。嗯,奇怪的是'Mixpanel'从不会引发错误。 –