2016-03-08 68 views
0

我目前正在使用pika网站上提供的async示例消费者,并且想知道是否有可能让一个消费者消费两个队列? rabbitmq网站上的示例似乎仅适用于每个队列的一位消费者。消耗两个队列rabbitmq pika python

回答

1

您只需声明另一个队列(QUEUE_2 ='another_queue'),并修改一些方法。你有我修改的方法:

def on_exchange_declareok(self, unused_frame): 
    """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC 
    command. 

    :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame 

    """ 
    LOGGER.info('Exchange declared') 
    self.setup_queue(self.QUEUE) 
    self.setup_queue(self.QUEUE_2) 

def setup_queue(self, queue_name): 
    """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC 
    command. When it is complete, the on_queue_declareok method will 
    be invoked by pika. 

    :param str|unicode queue_name: The name of the queue to declare. 

    """ 
    LOGGER.info('Declaring queue %s', queue_name) 

    self._channel.queue_declare(self.on_queue_declareok, self.QUEUE) 
    self._channel.queue_declare(self.on_queue_declareok, self.QUEUE_2) 


def on_queue_declareok(self, method_frame): 
    """Method invoked by pika when the Queue.Declare RPC call made in 
    setup_queue has completed. In this method we will bind the queue 
    and exchange together with the routing key by issuing the Queue.Bind 
    RPC command. When this command is complete, the on_bindok method will 
    be invoked by pika. 

    :param pika.frame.Method method_frame: The Queue.DeclareOk frame 

    """ 
    LOGGER.info('Binding %s to %s with %s', 
       self.EXCHANGE, self.QUEUE, self.ROUTING_KEY) 
    self._channel.queue_bind(self.on_bindok, self.QUEUE, 
          self.EXCHANGE, self.ROUTING_KEY) 
    LOGGER.info('Binding %s to %s with %s', 
       self.EXCHANGE, self.QUEUE_2, self.ROUTING_KEY) 
    self._channel.queue_bind(self.on_bindok, self.QUEUE_2, 
          self.EXCHANGE, self.ROUTING_KEY) 


def start_consuming(self): 
    """This method sets up the consumer by first calling 
    add_on_cancel_callback so that the object is notified if RabbitMQ 
    cancels the consumer. It then issues the Basic.Consume RPC command 
    which returns the consumer tag that is used to uniquely identify the 
    consumer with RabbitMQ. We keep the value to use it when we want to 
    cancel consuming. The on_message method is passed in as a callback pika 
    will invoke when a message is fully received. 

    """ 
    LOGGER.info('Issuing consumer related RPC commands') 
    self.add_on_cancel_callback() 
self._chalnnel.basic_qos(prefetch_count=1) 
self._consumer_tag = self._channel.basic_consume(self.on_message, 
               self.QUEUE) 

self._consumer_tag = self._channel.basic_consume(self.on_message, 
               self.QUEUE_2)