2011-08-17 192 views
0

使用Ruby amqp库和Ruby 1.8.7的v0.7.1,我试图向RabbitMQ服务器发送大量(百万)短(〜40字节)消息。我的程序的主循环(当然,不是一个真正的循环,但仍然)是这样的:向AMQP队列发送大量消息

AMQP.start(:host => '1.2.3.4', 
     :username => 'foo', 
     :password => 'bar') do |connection| 

    channel = AMQP::Channel.new(connection) 
    exchange = channel.topic("foobar", {:durable => true}) 
    i = 0 

    EM.add_periodic_timer(1) do 
    print "\rPublished #{i} commits" 
    end 

    results = get_results # <- Returns an array 

    processor = proc do 
    if x = results.shift then 
     exchange.publish(x, :persistent => true, 
         :routing_key => "test.#{i}") 
     i += 1 
     EM.next_tick processor 
     end 
    end 
    EM.next_tick(processor) 
    AMQP.stop {EM.stop} end 

代码开始处理结果阵列就好了,但经过一段时间(通常情况下,后12K的消息左右),它具有以下错误的模具

/Library/Ruby/Gems/1.8/gems/amqp-0.7.1/lib/amqp/channel.rb:807:in `send': 
The channel 1 was closed, you can't use it anymore! (AMQP::ChannelClosedError) 

没有消息存储在队列中。错误似乎在从程序到队列服务器的网络活动开始时发生。

我在做什么错?

+0

RabbitMQ日志说什么?经纪人仍在运行? 'lsof -i:5672'返回什么? –

+0

没什么特别的,它表示在脚本启动时打开一个连接,然后关闭连接。甚至在我的代码失败后,RabbitMQ仍能正常地为其他队列和客户端服务。我不认为这是RabbitMQ的问题。 –

回答

-1

第一个错误是您没有发布您正在使用的RabbitMQ版本。许多人正在运行陈旧的旧版本1.7.2,因为这是他们的OS软件包存储库中的内容。对于发送您的消息量的任何人都是不好的举动。从RabbitMQ站点获取RabbitMQ 2.5.1,并摆脱默认的系统包。

第二个错误是您没有告诉我们RabbitMQ日志中的内容。

第三个错误是你没有说什么消费信息。是否有另一个进程运行某个地方声明了队列并将其绑定到交换机上。除非有人向RabbitMQ声明并将其绑定到交换机,否则有消息队列。即使这样,如果队列的绑定密钥与您发布的路由密钥匹配,消息也只会流动。

第四个错误。您将路由密钥和绑定密钥混合起来。路由密钥是一个字符串,例如topic.test.json.echos,绑定密钥(用于将队列绑定到交换机)是一种类似主题。#或主题的模式。 .json。

你澄清后更新 至于版本,我不知道,当它被固定的,但是有一个问题,在1.7.2有大量造成的RabbitMQ持久性消息的崩溃,当它滑过它的持久性日志,并在崩溃后,它无法重新启动,直到有人手动取消翻转。

当你说连接正在打开和关闭时,我希望它不是每条消息。这将是使用AMQP的一种奇怪的方式。

让我重复一遍。生产者做不是将消息写入队列。他们将消息写入交换机,然后根据路由键(字符串)和队列的绑定键(模式)将消息路由到队列。在你的例子中,我错误地使用了#号,但是我没有看到任何声明队列并将它绑定到交换机的东西。

+0

让我澄清一下:1.的确,我正在使用Debian的默认版本(1.8.1)。使用Python的AMQP库,这个版本可以忍受数十万条消息而不会冒汗。 2.没有什么特别的,只是连接打开,然后关闭。 3.有几个消费者声明适当的队列绑定(经过验证的工作)。但是,消费者不应该影响生产者将消息写入队列的能力。不,这不是一个队列绑定,这是Ruby替换字符串的方式。 –

+0

总而言之,我认为这是一个Ruby AMQP/eventmachine bug,而不是我的代码或RabbitMQ中的错误。整个设置对于少量消息正常工作。 –