2017-03-01 80 views
0

当我手动删除我的PikaClient消耗队列,没有发生。我可以重新创建具有相同名称的队列,但通道已停止使用队列(正常情况下,因为我已将其删除)。但是我希望在消耗的队列被删除时收到一个事件。如何检测队列已被删除?

我预计通道会自动关闭,但«on_channel_close_callback»永远不会被调用。 «basic_consume»不提供关闭任何回调。 另外重要的一点,我必须使用TornadoConnection。

鼠兔:0.10.0 的Python:2.7 旋风:4.3

谢谢您的帮助。

class PikaClient(object): 

    def __init__(self): 
     # init everything here 

    def connect(self): 
     pika.adapters.tornado_connection.TornadoConnection(connection_param, on_open_callback=self.on_connected) 

    def on_connected(self, connection): 
     self.logger.info('PikaClient: connected to RabbitMQ') 
     self.connected = True 
     self.connection = connection 
     self.connection.channel(self.on_channel_open) 

    def on_open_error_callback(self, *args): 
     self.logger.error("on_open_error_callback") 

    def on_channel_open(self, channel): 
     channel.add_on_close_callback(self.on_channel_close_callback) 

     channel.basic_consume(self.on_message, queue=self.queue_name, no_ack=True) 

    def on_channel_close_callback(self, reply_code, reply_text): 
     self.logger.error("Consumer was cancelled remotely, shutting down", reply_code=reply_code, reply_text=reply_text) 

回答

0

我找到了解决方法。 如果我的PikaClient已经消费了消息,我会每隔X秒检查一次。如果没有,我重新启动将自动创建一个队列的应用程序。

如果您有更好的解决方案,我仍然乐于提供建议。

def __init__(self): 
    ... 
    self.have_messages_been_consumed = False 

def on_connected(self, connection): 
    self.logger.info('PikaClient: connected to RabbitMQ') 
    self.connected = True 
    self.connection = connection 
    self.connection.add_timeout(X, self.check_if_messages_have_been_consumed) 
    self.connection.channel(self.on_channel_open) 

def check_if_messages_have_been_consumed(self): 
    if self.have_messages_been_consumed: 
     self.have_messages_been_consumed = False 
     self.connection.add_timeout(X, self.check_if_messages_have_been_consumed) 
    else: 
     # close_and_restart will set to False have_messages_been_consumed 
     self.close_and_restart() 

def on_message(self, channel, basic_deliver, header, body): 
    self.have_messages_been_consumed = True 
    ...