我正在使用multiprocessing
python库产生4 Process()
对象来并行化cpu密集型任务。这个任务(灵感和来自这个伟大的article的代码)是计算列表中每个整数的主要因子。python多处理.join()死锁取决于工作者函数
main.py:
import random
import multiprocessing
import sys
num_inputs = 4000
num_procs = 4
proc_inputs = num_inputs/num_procs
input_list = [int(1000*random.random()) for i in xrange(num_inputs)]
output_queue = multiprocessing.Queue()
procs = []
for p_i in xrange(num_procs):
print "Process [%d]"%p_i
proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
print " - num inputs: [%d]"%len(proc_list)
# Using target=worker1 HANGS on join
p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
# Using target=worker2 RETURNS with success
#p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))
procs.append(p)
p.start()
for p in jobs:
print "joining ", p, output_queue.qsize(), output_queue.full()
p.join()
print "joined ", p, output_queue.qsize(), output_queue.full()
print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)
观察:
- 如果每个进程的目标是功能
worker1
,用于将输入列表大于4000层的元件主线程得到卡在.join()
上,等待产生的进程终止并永不返回。 - 如果每个进程的目标是函数
worker2
,对于相同的输入列表,代码工作得很好,主线程返回。
这是非常混乱和我说话,worker1
和worker2
(见下文)之间的唯一区别是,在Queue
而后者插入每个进程列表中的一个列表前插入个人名单。
为什么会出现使用worker1
而不使用worker2
目标的死锁? 不应该都(或两者都不)超出Multiprocessing Queue maxsize limit is 32767?
worker1 VS worker2:
def worker1(proc_num, proc_list, output_queue):
'''worker function which deadlocks'''
for num in proc_list:
output_queue.put(factorize_naive(num))
def worker2(proc_num, proc_list, output_queue):
'''worker function that works'''
workers_stuff = []
for num in proc_list:
workers_stuff.append(factorize_naive(num))
output_queue.put(workers_stuff)
有对如此的相似问题进行很多,但我相信这个问题的核心是所有的人都明显不同。
相关链接:
- https://sopython.com/canon/82/programs-using-multiprocessing-hang-deadlock-and-never-complete/
- python multiprocessing issues
- python multiprocessing - process hangs on join for large queue
- Process.join() and queue don't work with large numbers
- Python 3 Multiprocessing queue deadlock when calling join before the queue is empty
- Script using multiprocessing module does not terminate
- Why does multiprocessing.Process.join() hang?
- When to call .join() on a process?
- What exactly is Python multiprocessing Module's .join() Method Doing?
非常详细的答复,谢谢。为了确保我的理解正确,您是在说“队列”是否缓冲到内存中取决于项目的数量,而不取决于存储项目的整体大小?如果是这样,你有理解为什么选择这个设计?再次感谢! – Matteo
我会回答,除非因为“未定义”是事实,我不想花费时间挖掘当前使用的每个实现。但“答案”无关紧要:答案是,无论如何,解决方案是在尝试加入填充它的流程之前先消耗队列。除此之外的任何答案都只是在特定版本的你当时正在使用的Python –
时间管理的好建议!谢谢!:) – Matteo