2013-12-12 53 views
0

我写了一个脚本,使用2个队列和3种类型的worker:producer,consumer(CPU绑定任务),writer(我需要依次编写结果)。两个队列:脚本没有退出

这是我的代码的简化版本:

from queue import Queue 
from threading import Thread 

def compute_single_score(data): 
    #do lots of calculations 
    return 0.0 

def producer(out_q, data_to_compute): 
    while stuff: 
     data = data_to_compute.popitem() 
     out_q.put(data) 
    out_q.put(_sentinel) 

def consumer(in_q, out_q): 
    while True: 
     data = in_q.get() 
     if data is _sentinel: 
      in_q.put(_sentinel) 
      break 
     out_q.put([data[0], compute_single_score(*data)]) 
     in_q.task_done() 

def writer(in_q): 
    while True: 
     data = in_q.get() 
     if data is _sentinel: 
      in_q.put(_sentinel) 
      break 
     in_q.task_done() 

if __name__ == '__main__': 
    _sentinel = object() 
    jobs_queue = Queue() 
    scores_queue = Queue() 

    t1 = Thread(target=producer, args=(jobs_queue, data_to_compute,)) 
    t2 = Thread(target=consumer, args=(jobs_queue,scores_queue,)) 
    t3 = Thread(target=consumer, args=(jobs_queue,scores_queue,)) 
    t4 = Thread(target=consumer, args=(jobs_queue,scores_queue,)) 
    t5 = Thread(target=consumer, args=(jobs_queue,scores_queue,)) 
    t6 = Thread(target=consumer, args=(jobs_queue,scores_queue,)) 
    t7 = Thread(target=consumer, args=(jobs_queue,scores_queue,)) 
    t8 = Thread(target=consumer, args=(jobs_queue,scores_queue,)) 
    t9 = Thread(target=writer, args=(scores_queue,)) 

    t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); t6.start(); t7.start(); t8.start(); t9.start() 

    jobs_queue.join() 
    scores_queue.join() 
    print('File written') 

它立即打印出来的书面文件“,而不是等待队列为空。因此,尽管执行了所有计算,但脚本不会退出。两条线程似乎仍然活跃。

非常感谢您的支持。

回答

1

它确实等待队列为空。但由于在队列中发生事件发生在线程中,因此它会快速到达.join()行,然后发生.put()。所以当它达到.join()队列是空的。

现在我不确定你试图实现什么,只是因为制片人有一个while stuff循环。我假设你想继续处理,直到这个条件成立。特别是你必须要等到t1线程退出,即

t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); t6.start(); t7.start(); t8.start(); t9.start() 

t1.join() # <-- this is important 
jobs_queue.join() 
scores_queue.join() 
print('File written') 

否则,你将无法进行同步。

附注1:由于GIL,创建CPU绑定线程没有意义。如果你的线程没有做任何IO(而他们不这样做),那么单线程时它会更好。至少有多个consumer线程是毫无意义的。

附注2:请勿使用逗号。这不是pythonic。相反,这样做:

threads = [] 
threads.append(Thread(target=producer, args=(jobs_queue, data_to_compute,))) 
threads.append(Thread(target=writer, args=(scores_queue,))) 
for i in range(10): 
    threads.append(Thread(target=consumer, args=(jobs_queue,scores_queue,))) 

for t in threads: 
    t.start() 

threads[0].join() 

附注3:你应该处理的情况下,当队列为空。 data = in_q.get()将永远阻塞,这意味着您的脚本不会退出(除非线程标记为daemon)。你应该例如做:

try: 
    data = in_q.get(timeout=1) 
except queue.Empty: 
    # handle empty queue here, perhaps quit if t1 is not alive 
    # otherwise just continue the loop 

    if not t1.is_alive(): # <-- you have to pass t1 to the thread 
     break 
    else: 
     continue 

再加入所有线程末(见侧面说明2)主线程:

for t in threads: 
    t.start() 
for t in threads: 
    t.join() 
print('File written') 

现在你甚至不用到加入队列。

+0

你有什么建议来解决(边注1)?流程?在这种情况下,我如何修改脚本?非常感谢您的帮助,我非常感谢。 – filannim

+1

@michele如果你在做一些cpu繁重的工作,那么我建议不要首先使用Python。你可以玩C插件,但有一个单独的C/C++服务可能会更好。如果你真的必须坚持Python,那么看看多处理模块。产生更多进程是利用Python中的多核心的唯一方式。 – freakish

+0

它仍然不退出。 :( – filannim

0

这是我在结束(根据之前说明的要求)使用的代码:

from multiprocessing import JoinableQueue 
from multiprocessing import Process 

def compute_single_score(data): 
    #do lots of calculations 
    return 0.0 

def producer(out_q, data_to_compute): 
    while stuff: 
     data = data_to_compute.popitem() 
     out_q.put(data) 

def consumer(in_q, out_q): 
    while True: 
     try: 
      data = in_q.get(timeout=5) 
     except: 
      break 
     out_q.put([data[0], compute_single_score(*data)]) 
     in_q.task_done() 

def writer(in_q): 
    while True: 
     try: 
      data = in_q.get(timeout=5) 
     except: 
      break 
     #write 
     in_q.task_done() 

if __name__ == '__main__': 
    jobs_queue = JoinableQueue() 
    scores_queue = JoinableQueue() 

    processes = [] 
    processes.append(Process(target=producer, args=(jobs_queue, data_to_compute,))) 
    processes.append(Process(target=writer, args=(scores_queue,))) 
    for i in range(10): 
     processes.append(Process(target=consumer, args=(jobs_queue,scores_queue,))) 

    for p in processes: 
     p.start() 

    processes[1].join() 
    scores_queue.join() 

    print('File written') 

我希望这将是对别人的帮助。