使用Ruby amqp库和Ruby 1.8.7的v0.7.1,我试图向RabbitMQ服务器发送大量(百万)短(〜40字节)消息。我的程序的主循环(当然,不是一个真正的循环,但仍然)是这样的:向AMQP队列发送大量消息
AMQP.start(:host => '1.2.3.4',
:username => 'foo',
:password => 'bar') do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("foobar", {:durable => true})
i = 0
EM.add_periodic_timer(1) do
print "\rPublished #{i} commits"
end
results = get_results # <- Returns an array
processor = proc do
if x = results.shift then
exchange.publish(x, :persistent => true,
:routing_key => "test.#{i}")
i += 1
EM.next_tick processor
end
end
EM.next_tick(processor)
AMQP.stop {EM.stop} end
代码开始处理结果阵列就好了,但经过一段时间(通常情况下,后12K的消息左右),它具有以下错误的模具
/Library/Ruby/Gems/1.8/gems/amqp-0.7.1/lib/amqp/channel.rb:807:in `send':
The channel 1 was closed, you can't use it anymore! (AMQP::ChannelClosedError)
没有消息存储在队列中。错误似乎在从程序到队列服务器的网络活动开始时发生。
我在做什么错?
RabbitMQ日志说什么?经纪人仍在运行? 'lsof -i:5672'返回什么? –
没什么特别的,它表示在脚本启动时打开一个连接,然后关闭连接。甚至在我的代码失败后,RabbitMQ仍能正常地为其他队列和客户端服务。我不认为这是RabbitMQ的问题。 –