2013-05-31 38 views
9

我正在寻找一个Python类(最好是标准语言的一部分,而不是第三方库)来管理异步“广播式”消息。需要一个线程安全的异步消息队列

我将有一个线程将消息放入队列('putMessageOnQueue'方法不能阻塞),然后多个其他线程将全部等待消息,大概称为某种阻塞'waitForMessage'函数。当一条消息放在队列中时,我希望每个等待线程都获得它自己的消息副本。

我看过内置的Queue类,但我不认为这是合适的,因为消费消息似乎涉及从队列中移除它们,所以只有1个客户端线程会看到每个线程。

这似乎应该是一个常见的用例,任何人都可以推荐一个解决方案吗?

+0

我相信你可以建立自己的班级,保持跟踪哪个线程得到哪个消息,没有太多问题。 – Bakuriu

回答

7

我认为对此的典型方法是为每个线程使用单独的消息队列,并将消息推送到之前已注册接收此类消息的每个队列中。

像这样的东西应该工作,但它是未经测试的代码......

from time import sleep 
from threading import Thread 
from Queue import Queue 

class DispatcherThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(DispatcherThread, self).__init__(*args, **kwargs) 
     self.interested_threads = [] 

    def run(self): 
     while 1: 
      if some_condition: 
       self.dispatch_message(some_message) 
      else: 
       sleep(0.1) 

    def register_interest(self, thread): 
     self.interested_threads.append(thread) 

    def dispatch_message(self, message): 
     for thread in self.interested_threads: 
      thread.put_message(message) 



class WorkerThread(Thread): 

    def __init__(self, *args, **kwargs): 
     super(WorkerThread, self).__init__(*args, **kwargs) 
     self.queue = Queue() 


    def run(self): 

     # Tell the dispatcher thread we want messages 
     dispatcher_thread.register_interest(self) 

     while 1: 
      # Wait for next message 
      message = self.queue.get() 

      # Process message 
      # ... 

    def put_message(self, message): 
     self.queue.put(message) 


dispatcher_thread = DispatcherThread() 
dispatcher_thread.start() 

worker_threads = [] 
for i in range(10): 
    worker_thread = WorkerThread() 
    worker_thread.start() 
    worker_threads.append(worker_thread) 

dispatcher_thread.join() 
+0

完美,这很好!可惜没有现成的版本,但是我认为一旦有人解释清楚(正如你所做的那样),原理并不复杂。 – codebox

+0

@codebox嗯,在['multiprocessing'](http://docs.python.org/2/library/multiprocessing.html)模块中有更好的支持,但这是为了子进程而不是线程。我想这是因为进程间通信通常比线程间通信更复杂,因为线程自然共享相同的堆。 – Aya

2

我觉得这是一个比较简单的例子(从队列例如Python Lib拍摄)

from threading import Thread 
from Queue import Queue 


num_worker_threads = 2 

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

q = Queue() 
for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

q.join()  # block until all tasks are done 
+0

这是如何满足问题的要求的?他明确表示队列不起作用,因为每个线程都需要该项目的副本。 – Wlerin