2011-11-18 29 views
4

我需要能够优雅地阻止在Pika ioloop中工作的消费者(工作人员)。工人应该在60秒后停下来。当前处理的消息应该完成。Pika ioloop异步套装超时时间(RabbitMQ)

我试图把一个connection.close()在回调函数内,但只停止当前线程而不是完整的ioloop。它给了一个可怕的错误输出。

请参阅第16行,并在我的代码如下:我用的(约鼠兔基本的例子ioloop http://pika.github.com/connecting.html#cps-example

from pika.adapters import SelectConnection 
    channel = None 
    def on_connected(connection): 
     connection.channel(on_channel_open) 

    def on_channel_open(new_channel): 
     global channel 
     channel = new_channel 
     channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared) 

    def on_queue_declared(frame): 
     channel.basic_consume(handle_delivery, queue='test') 

    def handle_delivery(channel, method, header, body): 
     print body 

     # timer stuff which did NOT work 
     global start_time, timeout, connection 
     time_diff = time.time()-start_time 
     if time_diff > timeout: 
      #raise KeyboardInterrupt 
      connection.close() 

    timeout = 60 
    start_time = time.time() 

    connection = SelectConnection(parameters, on_connected) 

    try: 
     connection.ioloop.start() 
    except KeyboardInterrupt: 
     connection.close() 
     connection.ioloop.start() 

回答

9

您可以打开的连接上附加一个超时回调函数 这里是。 。您例如额外的代码

timeout = 60 

def on_timeout(): 
    global connection 
    connection.close() 

connection.add_timeout(timeout, on_timeout) 
-4

你可以尝试使用:

connection.ioloop.stop()