2016-06-08 41 views
5
def get_connection_and_channel(self, connection_parameters): 
    connection = pika.BlockingConnection(connection_parameters) 
    channel = connection.channel() 
    return (connection, channel) 


connection_parameters = pika.ConnectionParameters(server, port, virtual_host, credentials=pika.PlainCredentials(user_name, password)) 

connection,channel = self.get_connection_and_channel(connection_parameters) 

channel.confirm_delivery() 
count=0 
for json_string in open(json_file, 'r'): 
    result_json = json.loads(json_string) 
    message_body = json.dumps(result_json['body']) 
    routing_key = result_json['RoutingKey'] 
    channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip()) 
    count += 1 
self.logger.info('Sent %d messages' % count) 
connection.close() 

我正在使用此代码向RabbitMQ服务器发送邮件。但偶尔,这不会将所有消息发送到相应的队列。它每次运行时都会丢失任意数量的消息。RabbitMq - pika - python - 发布时删除邮件

我不明白这里有什么问题。

回答

2

由于您的邮件无法将邮件路由到任何现有队列,因此可能会返回您的邮件。尝试在channel.confirm_delivery添加的回调:

channel.confirm_delivery(on_delivery_confirmation) 

def on_delivery_confirmation(self, method_frame): 
     confirmation_type = method_frame.method.NAME.split('.')[1].lower()    
     if confirmation_type == 'ack': 
      self.logger.info('message published') 
     elif confirmation_type == 'nack': 
      self.logger.info('message not routed') 

如果是这种情况,那么建议先结合消费者队列与交换和发布消息之前路由键。

1

简单(不可靠)方式

首先,启用耐用队列添加:

channel.queue_declare(queue='your_queue', durable=True) 

到两个,出版商和消费者(做发布/消费前)

然后,即使RabbitMQ服务器死亡并重新启动,您也可以确保您的队列不会丢失。

出版商

在发行,加上properties=pika.BasicProperties(delivery_mode=2)basic_publish电话,以确保您的信息是永久性的。

channel.basic_publish(exchange=self.output_exchange_name, 
         routing_key=routing_key, 
         body=message_body.strip(), 
         properties=pika.BasicProperties(delivery_mode=2)) 

这应该做的伎俩,以避免丢失_published消息。

消费者

从消费者角度来看,该official RabbitMQ tutorial for python说:

为了确保信息不会丢失,RabbitMQ的支持消息确认。一个ack(请求)被从消费者发回,告诉RabbitMQ已经收到,处理了一个特定的消息,并且RabbitMQ可以自由删除它。 [...]默认情况下,消息确认已打开。

当你建立了消费者,确保您发送ACK 得当,让RabbitMQ的从队列中删除它。

def callback(ch, method, properties, body): 
    print "Received %r" % (body,) 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

channel.basic_consume(callback, queue='your_queue') 

的真正安全的方式

如果你需要一个更强大和更可靠的方法是完全肯定确认发布中继上RabbitMQ的,你应该使用AMQP协议的plublish confirm功能。

pika documentation

import pika 

# Open a connection to RabbitMQ on localhost using all default parameters 
connection = pika.BlockingConnection() 

# Open the channel 
channel = connection.channel() 

# Declare the queue 
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False) 

# Turn on delivery confirmations 
channel.confirm_delivery() 

# Send a message 
if channel.basic_publish(exchange='test', 
         routing_key='test', 
         body='Hello World!', 
         properties=pika.BasicProperties(content_type='text/plain', 
                 delivery_mode=1)): 
    print 'Message publish was confirmed' 
else: 
    print 'Message could not be confirmed' 

所以要根据你的代码,我将使用类似于:

count=0 
for json_string in open(json_file, 'r'): 
    result_json = json.loads(json_string) 
    message_body = json.dumps(result_json['body']) 
    routing_key = result_json['RoutingKey'] 
    if channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip(), 
          properties=pika.BasicProperties(delivery_mode=2)): # Make it persistent 
     count += 1 
    else: 
     # Do something with your undelivered message 
self.logger.info('Sent %d messages' % count) 
connection.close() 

或者作为蛮力的方法,你可以使用一个while请使用循环代替if以确保发送所有消息:

count = 0 
for json_string in open(json_file, 'r'): 
    result_json = json.loads(json_string) 
    message_body = json.dumps(result_json['body']) 
    routing_key = result_json['RoutingKey'] 
    while not channel.basic_publish(exchange=self.output_exchange_name, 
            routing_key=routing_key, 
            body=message_body.strip(), 
            properties=pika.BasicProperties(delivery_mode=2)): 
     pass # Do nothing or even you can count retries 
    count += 1 
self.logger.info('Sent %d messages' % count) 
0

尝试这与您的命令只接收一个消息:

#!/usr/bin/env python 
import pika 
import ujson as json 


def receive(): 
    parameters = pika.ConnectionParameters(host='localhost') 
    connection = pika.BlockingConnection(parameters) 
    channel = connection.channel() 
    channel.queue_declare(queue='raw_post', durable=True) 

    method_frame, header_frame, body = channel.basic_get(queue='raw_post') 

    if method_frame.NAME == 'Basic.GetEmpty': 
     connection.close() 
     return '' 
    else: 
     channel.basic_ack(delivery_tag=method_frame.delivery_tag) 
     connection.close() 
     return json.loads(body), method_frame.message_count 


a = '' 
while a not in ['quit', 'sair', 'exit', 'bye']: 
    a = input("whats up?") 
    print(receive()) 

只是5000级的邮件发送者排队:

#!/usr/bin/env python 
import pika 
import ujson as json 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
channel = connection.channel() 

channel.queue_declare(queue='raw_post', durable=True) 

for i in range(5000): 
    info = {"info": "test", "value": i} 

    channel.basic_publish(exchange='', 
          routing_key='raw_post', 
          body=json.dumps(info), 
          properties=pika.BasicProperties(
           delivery_mode=2, # make message persistent 
         )) 

    print(" [x] Sent 'Hello World!' {}".format(i)) 
connection.close()