2012-12-11 106 views
0

假设我有一个线程池。这个线程池使用两个队列q1q2。它从q1读取并在q2上写入新项目。当q1为空时,我们交换两个队列q1, q2, = q2, q1,我们重复该过程,直到两者都为空。为了同步线程,我使用另一个只包含一个项目的队列,并在流程结束时删除此项目。使用两个队列同步线程

我认为这是一个非常愚蠢的做法。任何改进建议?

一个非常漂亮的简单例子是docs,但只有一个队列。我的解决方案看起来并不很漂亮,如果是正确的:

global flag 
global lock 
global barrier 
global q1 
global q2 
global q 
while True: 
    if q1.empty(): 
     flag = False 
     barrier.wait() # wait for all the theads to reach this point. 
     # execute the code of swapping queues only once 
     with lock: 
      if not flag: 
       flag = True 
       if q2.empty(): 
        q.get() 
        q.task_done() 
       else: 
        q1, q2 = q2, q1 

    process_items_in_q1() 
+2

您正在使用2个输入队列的原因吗?如果你有一个工作者线程池,为什么没有一个输入队列让工作者从中获得工作?先进先出?双输入队列是否是一项要求? – user1836293

+0

我正在实施BFS +我们发现的每个节点的一些东西。每个阶段的seaching可以并行化,但是当一个阶段完成后,才可以转到下一个阶段。当然,除非我错过了一个更好的主意。 –

回答

0

我觉得你的aproach工作。这里有两个人,你可能会或可能不会找到更好:

  1. 只能使用一个队列,推动N种标记分离的“级别”,其中N是线程数。当线程读取标记时,它只会调用barrier.wait()。如果目标仅仅是确保所有N个线程在关卡之间调用barrier.wait(),那么这应该足够了。

  2. 或者,您可以简化上述代码:使用2个普通列表而不是2个队列。所有线程从list1弹出并附加到list2,这不需要队列逻辑中的特殊照顾。另外,如果“levels”每个都相对较大,那么可以简单地为每个级别重复创建线程的整个代码,并等待它们全部完成。这将使得一个时间一个一个的逻辑更加明显。

(最后,像往常一样,请注意,到目前为止,你不要在Python中使用线程获得任何的处理能力。还有的GIL)

+0

感谢您的输入! 标记的替代解决方案应该是等效的。我最好的猜测是具有task_done的队列机制可以抽象出障碍。我没有看过它。 至于列表,他们实施低效率的'pop()'操作。至于GIL,我的process_items是I/O绑定的,所以线程正常。 –

+0

对不起,我记住'list1.pop()'没有参数,即弹出列表的最后一个元素(这是有效的)。如果需要的话,你可以在从list1移动到list2时将列表反转()。或者使用'collection.deque'。 –