2016-04-06 39 views
1

我正在将一个线程进程重写为一个多处理队列,以尝试加速大量计算。我已经得到了95%的路,但我不知道如何使用multiprocessingQueue为空时发出信号。Python多处理 - '队列'对象没有任何属性'task_done'/'join'


我原来的代码是这样的:

import Queue 
from threading import Thread 

num_fetch_threads = 4 
enclosure_queue = Queue() 

for i in range(num_fetch_threads): 
    worker = Thread(target=run_experiment, args=(i, enclosure_queue)) 
    worker.setDaemon(True) 
    worker.start() 

for experiment in experiment_collection: 
    enclosure_queue.put((experiment, otherVar)) 

enclosure_queue.join() 

和队列功能是这样的:

def run_experiment(i, q): 
    while True: 
    ... do stuff ... 
    q.task_done() 

我的新代码出头这样的:

from multiprocessing import Process, Queue 

num_fetch_threads = 4 
enclosure_queue = Queue() 

for i in range(num_fetch_threads): 
    worker = Process(target=run_experiment, args=(i, enclosure_queue)) 
    worker.daemon = True 
    worker.start() 

for experiment in experiment_collection: 
    enclosure_queue.put((experiment, otherVar)) 

worker.join() ## I only put this here bc enclosure_queue.join() is not available 

而新队列功能:

def run_experiment(i, q): 
    while True: 
    ... do stuff ... 
    ## not sure what should go here 

我一直在阅读的文档和谷歌,但无法弄清楚什么我失踪 - 我知道task_done/join不是的一部分multiprocessingQueue类,但它不清楚我应该使用什么。

“他们在不同的队列中缺少task_done()和join()引入的Python 2.5的Queue.Queue类方法 。” Source

但是,如果没有这些,我不知道队列是如何知道它已完成,以及如何继续使用该程序。

回答

2

考虑使用multiprocessing.Pool而不是手动管理工作人员。池处理调度任务给工人,具有方便的功能,如地图和应用,并支持.close.join方法。 Pool负责处理进程之间的队列并处理结果。以下是您的代码如何使用multiprocessing.Pool

from multiprocessing import Pool 

def do_experiment(exp): 
    # run the experiment `exp`, will be called by `p.map` 
    return result 

p = Pool() # automatically scales to the number of CPUs available 

results = p.map(do_experiment, experiment_collection) 
p.close() 
p.join() 
相关问题