2017-08-29 57 views
1

我正在使用multiprocessing python库产生4 Process()对象来并行化cpu密集型任务。这个任务(灵感和来自这个伟大的article的代码)是计算列表中每个整数的主要因子。python多处理.join()死锁取决于工作者函数

main.py:

import random 
import multiprocessing 
import sys 

num_inputs = 4000 
num_procs = 4 
proc_inputs = num_inputs/num_procs 
input_list = [int(1000*random.random()) for i in xrange(num_inputs)] 

output_queue = multiprocessing.Queue() 
procs  = [] 
for p_i in xrange(num_procs): 
    print "Process [%d]"%p_i 
    proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)] 
    print " - num inputs: [%d]"%len(proc_list) 

    # Using target=worker1 HANGS on join 
    p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue)) 
    # Using target=worker2 RETURNS with success 
    #p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue)) 

    procs.append(p) 
    p.start() 

for p in jobs: 
    print "joining ", p, output_queue.qsize(), output_queue.full() 
    p.join() 
    print "joined ", p, output_queue.qsize(), output_queue.full() 

print "Processing complete." 
ret_vals = [] 
while output_queue.empty() == False: 
    ret_vals.append(output_queue.get()) 
print len(ret_vals) 
print sys.getsizeof(ret_vals) 

观察:

  • 如果每个进程的目标是功能worker1,用于将输入列表大于4000层的元件主线程得到卡在.join()上,等待产生的进程终止并永不返回。
  • 如果每个进程的目标是函数worker2,对于相同的输入列表,代码工作得很好,主线程返回。

这是非常混乱和我说话,worker1worker2(见下文)之间的唯一区别是,在Queue而后者插入每个进程列表中的一个列表前插入个人名单。

为什么会出现使用worker1而不使用worker2目标的死锁? 不应该都(或两者都不)超出Multiprocessing Queue maxsize limit is 32767


worker1 VS worker2:

def worker1(proc_num, proc_list, output_queue): 
    '''worker function which deadlocks''' 
    for num in proc_list: 
     output_queue.put(factorize_naive(num)) 

def worker2(proc_num, proc_list, output_queue): 
    '''worker function that works''' 
    workers_stuff = [] 

    for num in proc_list: 
     workers_stuff.append(factorize_naive(num)) 
    output_queue.put(workers_stuff) 

有对如此的相似问题进行很多,但我相信这个问题的核心是所有的人都明显不同。

相关链接:

回答

2

该文档提醒一下:

警告正如上面提到的,如果一个子进程已经把项目一个队列(现在也没有使用JoinableQueue。cancel_join_thread),那么该进程将不会终止,直到所有缓冲项目已被刷新到管道。 这意味着如果您尝试加入该进程,则可能会发生死锁,除非您确定放入队列中的所有项目都已被使用。同样,如果子进程是非守护进程,那么父进程在尝试加入所有非守护进程子进程时可能会在退出时挂起。

虽然Queue看起来是无界的,但在排队的项目被缓存在内存中以避免过载管道之间的过载。在这些内存缓冲区刷新之前,进程无法正常结束。你的worker1()放在的队列上,而不是你的worker2(),这就是它的全部。请注意,实现之前可以排队等待缓冲的内存中的项目数量未定义:它可以在OS和Python版本之间有所不同。

由于文档表明,正常的方式,以避免这是.get()所有项目关闭队列之前尝试.join()的过程。正如您发现的那样,是否需要这样做取决于每个工作进程已将多少项放入队列的未定义方式。

+0

非常详细的答复,谢谢。为了确保我的理解正确,您是在说“队列”是否缓冲到内存中取决于项目的数量,而不取决于存储项目的整体大小?如果是这样,你有理解为什么选择这个设计?再次感谢! – Matteo

+1

我会回答,除非因为“未定义”是事实,我不想花费时间挖掘当前使用的每个实现。但“答案”无关紧要:答案是,无论如何,解决方案是在尝试加入填充它的流程之前先消耗队列。除此之外的任何答案都只是在特定版本的你当时正在使用的Python –

+0

时间管理的好建议!谢谢!:) – Matteo