2013-09-25 68 views
2

我是Python新手,正在Python 2.7中开发一个应用程序。我正在使用由concurrent.futures库提供的线程池。一旦来自ThreadPool的线程启动,它需要等待来自RabbitMQ的一些消息。如何从python中的线程池中唤醒线程?

我该如何在Python中实现这个逻辑才能使该线程从池中等待事件消息?基本上,当我收到来自RabbitMQ的消息时(即等待并通知ThreadPool执行),我需要唤醒一个等待线程。

回答

3

首先定义一个Queue

from Queue import Queue 

q = Queue() 

然后,在你的线程,你尝试从队列中取得一个项目:

msg = q.get() 

这将阻止整个线程,直到有东西在队列中找到。

def on_message(msg): 
    q.put(msg) 
rabbitmq_channel.register_callback(on_message) 

,或者如果你喜欢短:现在

,在同一时间,假设你的到来的事件被触发回调,你注册一个回调,简单地把收到的RabbitMQ的消息队列中来通知代码:

rabbitmq_channel.register_callback(lambda msg: q.put(msg)) 

(上面是伪代码,因为我还没有使用RabbitMQ的RabbitMQ的,也不任何Python绑定,但你应该能够很容易地找出如何片断适应您的实际应用程序代码;关键部分要注意的是q.put(msg) - 只要确认通知了新消息,就立即调用该部分。)

只要发生这种情况,线程就会被唤醒并可以自由处理消息。为了重用多个消息在同一个线程,只需使用一个while循环:

while True: 
    msg = q.get() 
    process_message(msg) 

附:我会建议看看Gevent以及如何将它与RabbitMQ结合在您的Python应用程序中,以便能够摆脱线程并使用更轻量级和可扩展的绿色线程机制,而无需管理线程池(因为您可以拥有成千上万的greenlet在飞行中产生并杀死):

# this thing always called in a green thread; forget about pools and queues. 
def on_message(msg): 
    # you're in a green thread now; just process away! 
    benefit_from("all the gevent goodness!") 
    spawn_and_join_10_sub_greenlets() 

rabbitmq_channel.register_callback(lambda msg: gevent.spawn(on_message, msg)) 
+0

另外考虑'threading.Event'是你不需要处理消息。 – Veedrac

+0

@Erik完全同意你的看法。但是,如果池中有超过100个线程,则需要保持相同数量的调用回调方法被调用的队列对象。我正在考虑采用更优化的方式来处理这种情况,即更多数量的队列不需要维护。 – Mandy

+0

@Veedrac与线程相同的情况,我猜。我需要保持这么多的事件。 – Mandy