2017-01-23 42 views
0

我正在调查使用rabbitmq来管理我的应用程序的事件。更具体地说,我想:如何用保证的FIFO控制最大并行度?

  1. 确保我得到FIFO处理的每个队列事件:一个新的事件不处理,直到所有先前的事件已完全处理。
  2. 确保我可以控制并行执行的事件数量。

一个典型的例子是,我有200到800个队列,我想不允许超过8名工人并行。

我已决定使用n + 1个队列并且n + m的工人(N = 200〜800,m为8):

  • 第一类型的工人(n)的负责确保FIFO为 一个队列内的所有事件
  • 第二类型的工人(米)的只是在一个并行方式

这里执行的事件是伪代码:

def queues_declare(channel): 
    channel.queue_declare(queue='type1', durable=True) 
    channel.queue_declare(queue='type1_callback', durable=True) 
    channel.queue_declare(queue='type2', durable=True) 

def type1(channel): 
    def callback_type1(ch, method, properties, body): 
     channel.basic_publish(exchange='', 
           routing_key='type2', 
           body=body, 
           properties=pika.BasicProperties(
            reply_to = "type1_callback", 
            correlation_id = method.delivery_tag, 
            delivery_mode = 2, 
          )) 
    def callback_type1_callback(ch, method, properties, body): 
     ch.basic_ack(delivery_tag = properties.correlation_id) 
     ch.basic_ack(delivery_tag = method.delivery_tag) 

    queues_declare(channel) 
    channel.basic_qos(prefetch_count=1) 
    channel.basic_consume(callback_type1, 
          queue='type1') 
    channel.basic_consume(callback_type1_callback, 
          queue='type1_callback') 

def type2(channel): 
    queues_declare(channel) 

    def callback_type2(ch, method, properties, body): 
     # XXX: do work ! 
     channel.basic_publish(exchange='', 
           routing_key=properties.reply_to, 
           body='', 
           properties=pika.BasicProperties(
            correlation_id = properties.correlation_id, 
          )) 
     ch.basic_ack(delivery_tag = method.delivery_tag) 

    channel.basic_consume(callback_type2, 
          queue='type2') 

所以,我的问题是:这是正确的方式来达到我想要的rabbitmq?有没有更好的方法来控制并行性并确保FIFO处理?

回答

0

这里混合了几个问题。

  1. 确保FIFO顺序的唯一方法是使用单个队列进行序列化访问。并且使用很多只将消息重新发布到这个单一队列的工作人员实际上放松了这一保证 - 所以最好设置一个消息结构,使消息直接到达该队列。 无论如何,最大的缺点是你的性能受到CPU单核性能的限制。

  2. 有一种方法可以仅使用RabbitMQ本身来限制并发。您需要为此创建一个单独的队列,并预先填充与您所需的并发级别相等的消息量。然后,工人应该做的第一件事就是试着去获得这个信息,但不要承认它 - 所以这个信息将挂在这个没有受到感染的状态下,一辈子的工人。当工作人员死亡(或只是关闭AMQP连接)时,任何其他工作人员都可以访问该信息以获取该信息。但是,这又有一个缺点 - 只有在非集群环境中才能可靠地工作。例如。请参阅https://aphyr.com/posts/315-jepsen-rabbitmq,其中几乎完全正在测试此用例。