2013-11-23 72 views
3

我面临着下面的示例代码中的问题:Python的多重管道“死锁”

from multiprocessing import Lock, Process, Queue, current_process 

def worker(work_queue, done_queue): 
    for item in iter(work_queue.get, 'STOP'): 
      print("adding ", item, "to done queue") 
      #this works: done_queue.put(item*10) 
      done_queue.put(item*1000) #this doesnt! 
    return True 

def main(): 
    workers = 4 
    work_queue = Queue() 
    done_queue = Queue() 
    processes = [] 

    for x in range(10): 
     work_queue.put("hi"+str(x)) 

    for w in range(workers): 
     p = Process(target=worker, args=(work_queue, done_queue)) 
     p.start() 
     processes.append(p) 
     work_queue.put('STOP') 

    for p in processes: 
     p.join() 

    done_queue.put('STOP') 

    for item in iter(done_queue.get, 'STOP'): 
     print(item) 


if __name__ == '__main__': 
    main() 

当完成队列变得足够大(关于64K,我认为限制),整个事情冻结没有任何进一步的注意。

当队列变得太大时,这种情况的一般方法是什么?一旦它们被处理后,是否有一些方法可以在运行中删除元素? The Python docs recommend removing the p.join(),但在实际应用中,我无法估计进程何时完成。除了无限循环和使用.get_nowait()之外,是否还有一个简单的解决方案?

+0

这对我的作品在CPython的2.6,2.7,3.0,3.1,3.2,3.3和3.4alpha4。 2.5不包含多处理模块。你使用的是什么版本的Python? – dstromberg

+0

我正在使用3.3。尝试将数字从1000增加到更高的数字,管道大小限制取决于操作系统 – Stefan

+2

你见过吗*“这意味着无论何时使用队列,都需要确保放入队列的所有项目最终都会在进程加入之前被删除。“*在文档中?甚至有一个应该死锁的示例代码。在调用p.join()之前,'done_queue'必须为空。删除'p.join()'。在worker中添加'try:... finally:done_queue.put('STOP')'并重复'iter(done_queue.get,'STOP')'loop' len(进程)'次。 – jfs

回答

1

这适用于我3.4.0alpha4,3.3,3.2,3.1和2.6。它追溯到2.7和3.0。顺便说一句,顺便说一句。

#!/usr/local/cpython-3.3/bin/python 

'''SSCCE for a queue deadlock''' 

import sys 
import multiprocessing 

def worker(workerno, work_queue, done_queue): 
    '''Worker function''' 
    #reps = 10 # this worked for the OP 
    #reps = 1000 # this worked for me 
    reps = 10000 # this didn't 

    for item in iter(work_queue.get, 'STOP'): 
     print("adding", item, "to done queue") 
     #this works: done_queue.put(item*10) 
     for thing in item * reps: 
      #print('workerno: {}, adding thing {}'.format(workerno, thing)) 
      done_queue.put(thing) 
    done_queue.put('STOP') 
    print('workerno: {0}, exited loop'.format(workerno)) 
    return True 

def main(): 
    '''main function''' 
    workers = 4 
    work_queue = multiprocessing.Queue(maxsize=0) 
    done_queue = multiprocessing.Queue(maxsize=0) 
    processes = [] 

    for integer in range(10): 
     work_queue.put("hi"+str(integer)) 

    for workerno in range(workers): 
     dummy = workerno 
     process = multiprocessing.Process(target=worker, args=(workerno, work_queue, done_queue)) 
     process.start() 
     processes.append(process) 
     work_queue.put('STOP') 

    itemno = 0 
    stops = 0 
    while True: 
     item = done_queue.get() 
     itemno += 1 
     sys.stdout.write('itemno {0}\r'.format(itemno)) 
     if item == 'STOP': 
      stops += 1 
      if stops == workers: 
       break 
    print('exited done_queue empty loop') 


    for workerno, process in enumerate(processes): 
     print('attempting process.join() of workerno {0}'.format(workerno)) 
     process.join() 

    done_queue.put('STOP') 

if __name__ == '__main__': 
    main() 

HTH

+0

谢谢您的回答,但是在查看池后,它似乎是解决问题的更简单的方法 – Stefan