2013-05-03 32 views
3

我正在研究一个Python应用程序和后台线程,用于从RabbitMQ队列(主题方案)消费消息。Python Pika - 消费者进入线程

我在按钮的on_click事件上启动线程。 这里是我的代码,请关注“#self.receive_command()”。

def on_click_start_call(self,widget): 


    t_msg = threading.Thread(target=self.receive_command) 
    t_msg.start() 
    t_msg.join(0) 
    #self.receive_command() 


def receive_command(self): 

    syslog.syslog("ENTERED") 

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
    syslog.syslog("1") 

    channel = connection.channel() 
    syslog.syslog("2") 

    channel.exchange_declare(exchange='STORE_CMD', type='topic') 
    syslog.syslog("3") 

    result = channel.queue_declare(exclusive=True) 
    syslog.syslog("4") 

    queue_name = result.method.queue 
    syslog.syslog("5") 

    def callback_rabbit(ch,method,properties,body): 
     syslog.syslog("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n") 

    syslog.syslog("6") 

    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key') 
    syslog.syslog("7") 

    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True) 
    syslog.syslog("8") 

    channel.start_consuming() 

如果我运行此代码,我不能在syslog上看到消息1,2,3,5,6,7,8但我只能看到“已输入”。所以,代码被锁定在pika.BlokingConnection上。

如果我运行相同的代码(注释线程指令和分解直接调用函数),所有作为预期和消息都可以正确接收。

有什么解决方案可以将消费者运行到线程中?

由于提前

达维德

回答

6

我过我的机器上的代码,与鼠兔的最新版本。它工作正常。 Pika存在线程问题,但只要您为每个线程创建一个连接,就不会成为问题。

如果您遇到问题,很可能是因为旧版Pika中的错误,或者与导致问题的线程无关的问题。

我会建议您避免0.9.13,因为有多个错误,但 0.9.14 0.10.0应该很快就会发布。

[编辑] Pika 0.9.14已发布。

这是我使用的代码。

def receive_command(): 
    print("ENTERED") 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
    print("1") 
    channel = connection.channel() 
    print("2") 
    channel.exchange_declare(exchange='STORE_CMD', type='topic') 
    print("3") 
    result = channel.queue_declare(exclusive=True) 
    print("4") 
    queue_name = result.method.queue 
    print("5") 
    def callback_rabbit(ch,method,properties,body): 
     print("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n") 
    print("6") 
    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key') 
    print("7") 
    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True) 
    print("8") 
    channel.start_consuming() 

def start(): 
    t_msg = threading.Thread(target=receive_command) 
    t_msg.start() 
    t_msg.join(0) 
    #self.receive_command() 
start() 
1

另一种方法是通过线程方法channel.start_consuming作为目标,然后只是通过你的回调consume方法。 用法:consume(callback=your_method, queue=your_queue)

import threading 

def consume(self, *args, **kwargs): 
    if "channel" not in kwargs \ 
      or "callback" not in kwargs \ 
      or "queue" not in kwargs \ 
      or not callable(kwargs["callback"]): 
     return None 

    channel = kwargs["channel"] 
    callback = kwargs["callback"] 
    queue = kwargs["queue"] 
    channel.basic_consume(callback, queue=queue, no_ack=True) 

    t1 = threading.Thread(target=channel.start_consuming) 
    t1.start() 
    t1.join(0)