2012-06-15 153 views
9

使用Pool.apply_async运行大量任务(使用大参数)时,会分配进程并进入等待状态,并且等待进程的数量没有限制。这可以通过吃所有的记忆结束,如下面的例子:Python多处理:如何限制等待进程的数量?

import multiprocessing 
import numpy as np 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(): 

    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 

我在寻找一种方式来限制等待队列,以这样一种方式,只是在那里等待处理的数量有限,并且Pool.apply_async在等待队列已满时被阻塞。

+0

不错例子(+1)。 – mgilson

回答

6

multiprocessing.Pool有一个_taskqueue类型的成员multiprocessing.Queue,它需要一个可选的maxsize参数;不幸的是它构造它没有maxsize参数集。

我建议将multiprocessing.Poolmultiprocessing.Pool.__init__的副本粘贴maxsize_taskqueue的构造函数。

猴修补的对象(游泳池或队列)也将工作,但你必须猴补丁pool._taskqueue._maxsizepool._taskqueue._sem所以这将是非常脆弱:

pool._taskqueue._maxsize = maxsize 
pool._taskqueue._sem = BoundedSemaphore(maxsize) 
+1

我正在使用Python 2.7.3,而_taskqueue的类型是Queue.Queue。这意味着它是一个简单的Queue,而不是一个multiprocessing.Queue。子类化multiprocessing.Pool和覆盖__init__工作正常,但猴子补丁对象不能按预期工作。但是,这是我正在寻找的黑客,谢谢。 –

0

你可以增加明确的队列在这种情况下使用最大参数并使用queue.put()而不是pool.apply_async()。然后,工作进程可能:

for a, b in iter(queue.get, sentinel): 
    # process it 

如果要限制创建的输入参数/结果是在内存中活动的工作流程的大致数数,那么你可以使用pool.imap*()方法:

#!/usr/bin/env python 
import multiprocessing 
import numpy as np 

def f(a_b): 
    return np.linalg.solve(*a_b) 

def main(): 
    args = ((np.random.rand(1000,1000), np.random.rand(1000)) 
      for _ in range(1000)) 
    p = multiprocessing.Pool() 
    for result in p.imap_unordered(f, args, chunksize=1): 
     pass 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    main() 
+0

使用'imap'没有什么区别。输入队列仍然是无限的,使用这个解决方案最终会吃掉所有的内存。 – Radim

+0

@Radim:即使您给它一个无限生成器,答案中的“imap”代码也能正常工作。 – jfs

+0

不在Python 2中,不幸的是(没有看过py3中的代码)。对于一些解决方法,请参阅[这个SO答案](http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration)。 – Radim

1

如果pool._taskqueue超出所需尺寸,则等待:

import multiprocessing 
import numpy as np 
import time 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(max_apply_size=100): 
    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 

     while pool._taskqueue.qsize() > max_apply_size: 
      time.sleep(1) 

    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 
+0

只是想补充一点,我发现这是针对我的多处理内存问题的最简单的解决方案。我使用max_apply_size = 10,对我的问题工作正常,这是一个缓慢的文件转换。使用信号量作为@ecatmur建议似乎是一个更强大的解决方案,但对于简单的脚本可能是矫枉过正的。 – Nate

相关问题