简单(不可靠)方式
首先,启用耐用队列添加:
channel.queue_declare(queue='your_queue', durable=True)
到两个,出版商和消费者(做发布/消费前)。
然后,即使RabbitMQ服务器死亡并重新启动,您也可以确保您的队列不会丢失。
出版商
在发行,加上properties=pika.BasicProperties(delivery_mode=2)
您basic_publish
电话,以确保您的信息是永久性的。
channel.basic_publish(exchange=self.output_exchange_name,
routing_key=routing_key,
body=message_body.strip(),
properties=pika.BasicProperties(delivery_mode=2))
这应该做的伎俩,以避免丢失_published消息。
消费者
从消费者角度来看,该official RabbitMQ tutorial for python说:
为了确保信息不会丢失,RabbitMQ的支持消息确认。一个ack(请求)被从消费者发回,告诉RabbitMQ已经收到,处理了一个特定的消息,并且RabbitMQ可以自由删除它。 [...]默认情况下,消息确认已打开。
当你建立了消费者,确保您发送ACK 得当,让RabbitMQ的从队列中删除它。
def callback(ch, method, properties, body):
print "Received %r" % (body,)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback, queue='your_queue')
的真正安全的方式
如果你需要一个更强大和更可靠的方法是完全肯定确认发布中继上RabbitMQ的,你应该使用AMQP协议的plublish confirm功能。
从pika documentation:
import pika
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection()
# Open the channel
channel = connection.channel()
# Declare the queue
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)
# Turn on delivery confirmations
channel.confirm_delivery()
# Send a message
if channel.basic_publish(exchange='test',
routing_key='test',
body='Hello World!',
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=1)):
print 'Message publish was confirmed'
else:
print 'Message could not be confirmed'
所以要根据你的代码,我将使用类似于:
count=0
for json_string in open(json_file, 'r'):
result_json = json.loads(json_string)
message_body = json.dumps(result_json['body'])
routing_key = result_json['RoutingKey']
if channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip(),
properties=pika.BasicProperties(delivery_mode=2)): # Make it persistent
count += 1
else:
# Do something with your undelivered message
self.logger.info('Sent %d messages' % count)
connection.close()
或者作为蛮力的方法,你可以使用一个while
请使用循环代替if
以确保发送所有消息:
count = 0
for json_string in open(json_file, 'r'):
result_json = json.loads(json_string)
message_body = json.dumps(result_json['body'])
routing_key = result_json['RoutingKey']
while not channel.basic_publish(exchange=self.output_exchange_name,
routing_key=routing_key,
body=message_body.strip(),
properties=pika.BasicProperties(delivery_mode=2)):
pass # Do nothing or even you can count retries
count += 1
self.logger.info('Sent %d messages' % count)