2011-07-12 90 views
10

我想在Python中使用多处理库中的队列。在执行下面的代码之后(打印语句正常工作),但在我调用加入队列并且仍然存在之后,进程不会退出。我怎样才能终止剩余的进程?Python中的多处理队列

谢谢!

def MultiprocessTest(self): 
    print "Starting multiprocess." 
    print "Number of CPUs",multiprocessing.cpu_count() 

    num_procs = 4 
    def do_work(message): 
    print "work",message ,"completed" 

    def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

    q = multiprocessing.JoinableQueue() 
    for i in range(num_procs): 
    p = multiprocessing.Process(target=worker) 
    p.daemon = True 
    p.start() 

    source = ['hi','there','how','are','you','doing'] 
    for item in source: 
    q.put(item) 
    print "q close" 
    q.join() 
    #q.close() 
    print "Finished everything...." 
    print "num active children:",multiprocessing.active_children() 

回答

7

试试这个:

import multiprocessing 

num_procs = 4 
def do_work(message): 
    print "work",message ,"completed" 

def worker(): 
    for item in iter(q.get, None): 
    do_work(item) 
    q.task_done() 
    q.task_done() 

q = multiprocessing.JoinableQueue() 
procs = [] 
for i in range(num_procs): 
    procs.append(multiprocessing.Process(target=worker)) 
    procs[-1].daemon = True 
    procs[-1].start() 

source = ['hi','there','how','are','you','doing'] 
for item in source: 
    q.put(item) 

q.join() 

for p in procs: 
    q.put(None) 

q.join() 

for p in procs: 
    p.join() 

print "Finished everything...." 
print "num active children:", multiprocessing.active_children() 
+0

你有没有任何理由在完成后将None放入队列中?我认为task_done()可以帮助避免这个问题?我试图在本页面底部的示例之后对代码进行建模:http://docs.python.org/library/queue.html – aerain

+0

这实际上不工作:( – aerain

+0

未对解决方案进行评级,但暗示如何让它运行:在def worker的第一次使用之前移动“q =”声明行()... ;-) – Dilettant

3

在参加该过程之前清除队列,但q.empty()是不可靠的。

清除队列的最好方法是统计成功获取或循环的次数,直到您收到一个sentinel值,就像拥有可靠网络的套接字一样。

6

你的工作人员需要一个哨兵终止,否则他们只会坐在封锁的阅读。需要注意的是使用在Q睡眠,而不是加入在P,可以显示状态信息等
我的最佳模板是:

def worker(q,nameStr): 
    print 'Worker %s started' %nameStr 
    while True: 
    item = q.get() 
    if item is None: # detect sentinel 
     break 
    print '%s processed %s' % (nameStr,item) # do something useful 
    q.task_done() 
    print 'Worker %s Finished' % nameStr 
    q.task_done() 

q = multiprocessing.JoinableQueue() 
procs = [] 
for i in range(num_procs): 
    nameStr = 'Worker_'+str(i) 
    p = multiprocessing.Process(target=worker, args=(q,nameStr)) 
    p.daemon = True 
    p.start() 
    procs.append(p) 

source = ['hi','there','how','are','you','doing'] 
for item in source: 
    q.put(item) 

for i in range(num_procs): 
    q.put(None) # send termination sentinel, one for each process 

while not q.empty(): # wait for processing to finish 
    sleep(1) # manage timeouts and status updates etc. 
+1

虽然不是q.empty(),但是只有当工作人员抓住要完成的最后一项工作时,才能知道处理已完成。坦率地说,如果你不恰当地使用JoinableQueue,你不需要一个JoinableQueue。如果您选择不使用某个工作线程,则不需要工作线程标记task_done。使用这样一个队列的目的是让你可以加入它,这正是你在这个程序结束时所要做的,而不是等待队列为空。 – leetNightshade

+0

是的,用这种方法,工作提前结束。 – Forethinker

1

下面的代码可能不是很相关,但我将它张贴您的意见/反馈,所以我们可以一起学习。谢谢!

import multiprocessing 

def boss(q,nameStr): 
    source = range(1024) 
    for item in source: 
    q.put(nameStr+' '+str(item)) 
    q.put(None) # send termination sentinel, one for each process 

def worker(q,nameStr): 
    while True: 
    item = q.get() 
    if item is None: # detect sentinel 
     break 
    print '%s processed %s' % (nameStr,item) # do something useful 

q = multiprocessing.Queue() 

procs = [] 

num_procs = 4 
for i in range(num_procs): 
    nameStr = 'ID_'+str(i) 
    p = multiprocessing.Process(target=worker, args=(q,nameStr)) 
    procs.append(p) 
    p = multiprocessing.Process(target=boss, args=(q,nameStr)) 
    procs.append(p) 

for j in procs: 
    j.start() 
for j in procs: 
    j.join() 
0

这里是你把一些任务上JoinableQueue,然后启动消耗的任务和退出,一旦他们阅读的队列“干”的工作进程比较简单的情况下,前哨免费方法。诀窍是使用JoinableQueue.get_nowait()而不是get()。顾名思义,get_nowait()试图以非阻塞的方式从队列中获取值,并且如果没有什么可以获得,则会引发一个queue.Empty异常。工人通过退出来处理这个异常。

基本的代码来说明一个道理:

import multiprocessing as mp 
from queue import Empty 

def worker(q): 
    while True: 
    try: 
     work = q.get_nowait() 
     # ... do something with `work` 
     q.task_done() 
    except Empty: 
     break # completely done 

# main 
worknum = 4 
jq = mp.JoinableQueue() 

# fill up the task queue 
# let's assume `tasks` contains some sort of data 
# that your workers know how to process 
for task in tasks: 
    jq.put(task) 

procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ] 
for p in procs: 
    p.start() 

for p in procs: 
    p.join() 

的好处是,你不需要把“毒丸”上的队列,以便代码是有点短。

重要:在生产者和消费者使用相同的队列中“交织”的方式与工人可能要等待新的任务来沿着更复杂的情况下,“毒丸”的做法应该是用过的。我上面的建议是针对工人“知道”如果任务队列是空的简单情况,那么再没有任何问题了。