我写了一个脚本,使用2个队列和3种类型的worker:producer,consumer(CPU绑定任务),writer(我需要依次编写结果)。两个队列:脚本没有退出
这是我的代码的简化版本:
from queue import Queue
from threading import Thread
def compute_single_score(data):
#do lots of calculations
return 0.0
def producer(out_q, data_to_compute):
while stuff:
data = data_to_compute.popitem()
out_q.put(data)
out_q.put(_sentinel)
def consumer(in_q, out_q):
while True:
data = in_q.get()
if data is _sentinel:
in_q.put(_sentinel)
break
out_q.put([data[0], compute_single_score(*data)])
in_q.task_done()
def writer(in_q):
while True:
data = in_q.get()
if data is _sentinel:
in_q.put(_sentinel)
break
in_q.task_done()
if __name__ == '__main__':
_sentinel = object()
jobs_queue = Queue()
scores_queue = Queue()
t1 = Thread(target=producer, args=(jobs_queue, data_to_compute,))
t2 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
t3 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
t4 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
t5 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
t6 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
t7 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
t8 = Thread(target=consumer, args=(jobs_queue,scores_queue,))
t9 = Thread(target=writer, args=(scores_queue,))
t1.start(); t2.start(); t3.start(); t4.start(); t5.start(); t6.start(); t7.start(); t8.start(); t9.start()
jobs_queue.join()
scores_queue.join()
print('File written')
它立即打印出来的书面文件“,而不是等待队列为空。因此,尽管执行了所有计算,但脚本不会退出。两条线程似乎仍然活跃。
非常感谢您的支持。
你有什么建议来解决(边注1)?流程?在这种情况下,我如何修改脚本?非常感谢您的帮助,我非常感谢。 – filannim
@michele如果你在做一些cpu繁重的工作,那么我建议不要首先使用Python。你可以玩C插件,但有一个单独的C/C++服务可能会更好。如果你真的必须坚持Python,那么看看多处理模块。产生更多进程是利用Python中的多核心的唯一方式。 – freakish
它仍然不退出。 :( – filannim