2011-09-05 35 views
11

我有一个256x256x256 Numpy数组,其中每个元素是一个矩阵。我需要对这些矩阵中的每一个做一些计算,并且我想使用multiprocessing模块来加快速度。结合itertools和多处理?

这些计算的结果必须被存储在一个阵列256x256x256像原来之一,使得在元件[i,j,k]原始阵列中的矩阵的结果必须在新阵列的[i,j,k]元件放。

要做到这一点,我想列出一个列表,可以伪造的方式编写为[array[i,j,k], (i, j, k)]并将它传递给一个函数以“多处理”。 假设matrices是从原来的阵列和myfunc提取的所有矩阵的名单正在做的计算功能,代码看起来有点像这样:

import multiprocessing 
import numpy as np 
from itertools import izip 

def myfunc(finput): 
    # Do some calculations... 
    ... 

    # ... and return the result and the index: 
    return (result, finput[1]) 

# Make indices: 
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3) 

# Make function input from the matrices and the indices: 
finput = izip(matrices, inds) 

pool = multiprocessing.Pool() 
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999)) 

然而,这似乎是map_async实际上是创建这个巨大的finput - 列表第一:我的CPU没有太多的工作,但内存和交换在几秒钟内完全消耗,这显然不是我想要的。

有没有办法将这个庞大的列表传递给一个多处理函数,而无需先显式创建它? 或者你知道解决这个问题的另一种方法吗?

非常感谢! :-)

+1

由于您在'map_async()'上使用'get()',因此您可能不希望执行* asynchronous *操作,而应该使用'Pool.map()'。 –

+0

也许我不明白这个问题,但你有没有考虑过imap或imap_unordered? –

回答

10

全部multiprocessing.Pool.map*方法一旦函数被调用就完全消耗迭代器(demo code)。喂迭代一个组块的映射函数的块的时间,使用grouper_nofill

def grouper_nofill(n, iterable): 
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']] 
    ''' 
    it=iter(iterable) 
    def take(): 
     while 1: yield list(itertools.islice(it,n)) 
    return iter(take().next,[]) 

chunksize=256 
async_results=[] 
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)): 
    async_results.extend(pool.map_async(myfunc, finput).get()) 
async_results=np.array(async_results) 

PS。 pool.map_asyncchunksize参数做了一些不同的操作:它将迭代器分解为块,然后将每个块分配给一个称为map(func,chunk)的工作进程。如果func(item)完成得太快,这可以为工作进程提供更多数据,但它不会对您的情况有所帮助,因为在发出调用map_async之后,迭代器仍会立即充分消耗。

+0

非常感谢!你的解决方案似乎确实工作!作为参考,我不得不使用pool.map_async(myfunc,finput).get(999999),但它的工作原理!但是,它仍然使用大量内存(当然取决于确切的chunksize),并且python在运行期间似乎不是垃圾收集。任何想法,为什么这可能是? – digitaldingo

+0

@digitaldingo:嗯,没有任何想法。如果您可以将代码缩减为[SSCCE](http://sscce.org/)并将其发布到此处,那将是理想之选。 – unutbu

0

Pool.map_async()需要知道可迭代的长度以将工作分派给多个工作者。由于izip没有__len__,因此它会将迭代器首先转换为列表,导致您遇到的巨大内存使用量。

您可以尝试通过使用__len__创建自己的izip样式迭代器来避开此问题。

+0

它为什么需要知道?为什么它不能简单地喂饱所有闲置的工人和等待? –

+0

@andrew - 'map_async()'('multiprocessing/pool.py')中的第一行实际上是'if not hasattr(iterable,'__len__'):iterable = list(iterable)'。它需要知道长度以创建足够大的输出列表,因为工人的完成顺序未知。 –

+0

嗯。它可以动态构建它,不是吗?我只是想这可能会成为一个问题。这似乎是一个有效的请求。 –

2

我也遇到了这个问题。而不是这样的:

res = p.map(func, combinations(arr, select_n)) 

res = p.imap(func, combinations(arr, select_n)) 

IMAP不消耗它!