我正在调查使用rabbitmq来管理我的应用程序的事件。更具体地说,我想:如何用保证的FIFO控制最大并行度?
- 确保我得到FIFO处理的每个队列事件:一个新的事件不处理,直到所有先前的事件已完全处理。
- 确保我可以控制并行执行的事件数量。
一个典型的例子是,我有200到800个队列,我想不允许超过8名工人并行。
我已决定使用n + 1个队列并且n + m的工人(N = 200〜800,m为8):
- 第一类型的工人(n)的负责确保FIFO为 一个队列内的所有事件
- 第二类型的工人(米)的只是在一个并行方式
这里执行的事件是伪代码:
def queues_declare(channel):
channel.queue_declare(queue='type1', durable=True)
channel.queue_declare(queue='type1_callback', durable=True)
channel.queue_declare(queue='type2', durable=True)
def type1(channel):
def callback_type1(ch, method, properties, body):
channel.basic_publish(exchange='',
routing_key='type2',
body=body,
properties=pika.BasicProperties(
reply_to = "type1_callback",
correlation_id = method.delivery_tag,
delivery_mode = 2,
))
def callback_type1_callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = properties.correlation_id)
ch.basic_ack(delivery_tag = method.delivery_tag)
queues_declare(channel)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback_type1,
queue='type1')
channel.basic_consume(callback_type1_callback,
queue='type1_callback')
def type2(channel):
queues_declare(channel)
def callback_type2(ch, method, properties, body):
# XXX: do work !
channel.basic_publish(exchange='',
routing_key=properties.reply_to,
body='',
properties=pika.BasicProperties(
correlation_id = properties.correlation_id,
))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback_type2,
queue='type2')
所以,我的问题是:这是正确的方式来达到我想要的rabbitmq?有没有更好的方法来控制并行性并确保FIFO处理?