2012-10-01 94 views
1

我有一堆芹菜任务将其结果发送到RabbitMQ消息队列。发布的结果可能会变得非常大(高达几兆)。对于将大量数据放入RabbitMQ消息是否是一个好主意,意见不统一,但我在其他情况下看到了这项工作,只要内存得到控制,它似乎就可以工作。用Pika 0.9.5发送大量消息给RabbitMQ:消息悄悄被兔子丢弃

但是,对于我目前的一组任务,兔子似乎只是丢弃似乎太大的消息。我已经缩小它归结为一个相当简单的测试用例:

#!/usr/bin/env python 
import string 
import random 
import pika 
import os 
qname='examplequeue' 
connection = pika.BlockingConnection(pika.ConnectionParameters(
      host='mq.example.com')) 
channel = connection.channel() 

channel.queue_declare(queue=qname,durable=True) 

N=100000 
body = ''.join(random.choice(string.ascii_uppercase) for x in range(N)) 

promise = channel.basic_publish(exchange='', routing_key=qname, body=body, mandatory=0, immediate=0, properties=pika.BasicProperties(content_type="text/plain",delivery_mode=2)) 

print " [x] Sent 'Hello World!'" 
connection.close() 

我有3个节点的RabbitMQ集群,并mq.example.com圆知更鸟到每个节点。客户端在Ubuntu 12.04上使用Pika 0.9.5,RabbitMQ集群在Erlang R14B04上运行RabbitMQ 2.8.7。

执行此脚本将输出print语句并退出,不会引发任何异常。该消息从未出现在RabbitMQ中。

N更改为10000使其按预期工作。

为什么?

+0

是否RabbitMQ的日志说什么?如果您将芹菜更改为使用librabbitmq客户端,它有帮助吗? (你只需要为amqp://别名使用'pip install librabbitmq'来使用它) – asksol

回答

1

我想你在RabbitMq有tcp-backpressure mechanizm问题。你可以阅读约http://www.rabbitmq.com/memory.html。 我看到两个办法来解决这个问题:

  1. 添加TCP回调,并重新从兔每个TCP呼叫
  2. 用它发送给兔子之前压缩的消息,它将使容易推兔子。
def compress(s): 
    return binascii.hexlify(zlib.compress(s)) 

def decompress(s): 
    return zlib.decompress(binascii.unhexlify(s))