2016-02-17 97 views
1

我已经安装了该插件从这里rabbitmq-delayed-message-exchange发送延迟的消息。我怎么会使用RabbitMQ的延迟消息交换插件在RabbitMQ的发送延迟的消息?

我找不到在Python中使用它的任何帮助。我刚开始使用rabbitmq。

这是我一直想:

import pika 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
channel = connection.channel() 
channel.exchange_declare("test-x", type="x-delayed-message", arguments={"x-delayed-type":"direct"}) 
channel.queue_declare(queue='task_queue',durable=True) 
channel.queue_bind(queue="task_queue", exchange="test-x", routing_key="task_queue") 
channel.basic_publish(exchange='test-x',routing_key='task_queue',body='Hello World! Delayed',arguments={"x-delay":100}) 
print(" [x] Sent 'Hello World! Delayed'") 
connection.close() 

这里是交易所上市:

sudo rabbitmqctl list_exchanges 
Listing exchanges ... 
amq.direct direct 
test-x x-delayed-message 
amq.fanout fanout 
amq.match headers 
amq.headers headers 
    direct 
amq.rabbitmq.trace topic 
amq.topic topic 
amq.rabbitmq.log topic 

我没有一个好主意,我怎么能延迟参数传递给basic_publish功能

任何帮助表示赞赏

回答

1

您需要添加x-delay标头指向您的消息属性并指定毫秒中的延迟值。试试这个:

channel.basic_publish(
    exchange='test-x', 
    routing_key='task_queue', 
    body='Hello World! Delayed', 
    properties=pika.BasicProperties(headers={"x-delay": 1000}) 
) 
+0

它确实有效。将现在检查延迟。很棒 –

1

实际上你可以推迟的消息,而无需使用插件。使用QUEUE TTL - - 使用消息TTL 如果在队列中的所有消息将被延迟固定的时间使用的队列TTL兔队列 消息可以以2种方式 被延迟。 如果每个消息必须通过不同的时间延迟使用消息TTL。 我已经用python3和pika模块解释了它。以毫秒为单位 鼠BasicProperties参数“到期”具有要被设置在延迟队列延迟的消息。 设定到期时间之后,发布消息到delayed_queue(“不实际队列,消费者正在等待消耗”),一旦在delayed_queue消息到期,消息将使用交换“amq.direct”

def delay_publish(self, messages, queue, headers=None, expiration=0): 
    """ 
    Connect to RabbitMQ and publish messages to the queue 
    Args: 
     queue (string): queue name 
     messages (list or single item): messages to publish to rabbit queue 
     expiration(int): TTL in milliseconds for message 
    """ 
    delay_queue = "".join([queue, "_delay"]) 
    logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue)) 
    logging.info('Connecting to RabbitMQ: {host}'.format(
     host=self.rabbit_host)) 
    credentials = pika.PlainCredentials(
     RABBIT_MQ_USER, RABBIT_MQ_PASS) 
    parameters = pika.ConnectionParameters(
     rabbit_host, RABBIT_MQ_PORT, 
     RABBIT_MQ_VHOST, credentials, heartbeat_interval=0) 
    connection = pika.BlockingConnection(parameters) 

    channel = connection.channel() 
    channel.queue_declare(queue=queue, durable=True) 

    channel.queue_bind(exchange='amq.direct', 
         queue=queue) 
    delay_channel = connection.channel() 
    delay_channel.queue_declare(queue=delay_queue, durable=True, 
           arguments={ 
            'x-dead-letter-exchange': 'amq.direct', 
            'x-dead-letter-routing-key': queue 
           }) 

    properties = pika.BasicProperties(
     delivery_mode=2, headers=headers, expiration=str(expiration)) 

    if type(messages) not in (list, tuple): 
     messages = [messages] 

    try: 
     for message in messages: 
      try: 
       json_data = json.dumps(message) 
      except Exception as err: 
       logging.error(
        'Error Jsonify Payload: {err}, {payload}'.format(
         err=err, payload=repr(message)), exc_info=True 
       ) 
       if (type(message) is dict) and ('data' in message): 
        message['data'] = {} 
        message['error'] = 'Payload Invalid For JSON' 
        json_data = json.dumps(message) 
       else: 
        raise 

      try: 
       delay_channel.basic_publish(
        exchange='', routing_key=delay_queue, 
        body=json_data, properties=properties) 
      except Exception as err: 
       logging.error(
        'Error Publishing Data: {err}, {payload}'.format(
         err=err, payload=json_data), exc_info=True 
       ) 
       raise 

    except Exception: 
     raise 

    finally: 
     logging.info(
      'Done Publishing. Closing Connection to {queue}'.format(
       queue=delay_queue 
      ) 
     ) 
     connection.close() 
被路由到一个实际队列
+0

不知道。虽然它很长,下次还会利用它。谢谢您的帮助。 –