2017-08-05 28 views
1

如何通过numpy.apply_along_axis()将NumPy数组元素的函数应用并行化以利用多核?这似乎是一件很自然的事情,在所有对所使用函数的调用都是独立的情况下。numpy.apply_along_axis()的简单并行化?

在我的特殊情况下(如果这很重要),应用轴是轴0:np.apply_along_axis(func, axis=0, arr=param_grid)np是NumPy)。

我在有一个快速浏览一下Numba,但我似乎无法得到这个并行,与像一个循环:

@numba.jit(parallel=True) 
result = np.empty(shape=params.shape[1:]) 
for index in np.ndindex(*result.shape)): # All the indices of params[0,...] 
    result[index] = func(params[(slice(None),) + index]) # Applying func along axis 0 

还有显然是编译选项在与NumPy进行并行通过OpenMP,但似乎无法通过MacPorts访问。

人们也可能会想到可能会在几块中切割阵列,并使用线程(以避免复制数据)并将每个块上的函数并行应用。这比我所寻找的要复杂得多(如果Global Interpreter Lock没有足够的发布版本,这可能不起作用)。

能够以简单的方式使用多个内核对于简单的可并行化任务(比如将一个函数应用于数组的所有元素(这基本上就是这里所需要的) func()取一维数组参数)。

+0

'apply_along_axis'是纯粹的Python代码,除了将感兴趣的轴转置到最后,并且对其余的部分执行'ndindex(arr.shape [: - 1])'以外,您所做的只是显示。替代方法已经在像https://stackoverflow.com/questions/45067268/numpy-vectorized-2d-array-operation-error – hpaulj

+0

这样的帖子中讨论过了,因为第二个问题可以重新设计为2d(您的感兴趣轴加上其余部分),基本问题是1d列表理解。遍历行。另一个SO问题:https://stackoverflow.com/questions/44239498/how-to-apply-a-generic-function-over-numpy-rows – hpaulj

+0

我希望这些StackOverflow问题包含一个解决方案,使用多个核心,我可以使用!现在,我不确定Python列表理解如何成功比'np.apply_along_axis()'更快,但是至少可以通过探索'np.apply_along_axis()'的简单替代方法来加快单核版本的速度...... – EOL

回答

1

好吧,我的工作了:一个想法是使用标准multiprocessing模块并在短短数块原始数组拆分(以便限制与工人沟通的开销)。这可以如下被相对容易地完成:

import multiprocessing 

import numpy as np 

def parallel_apply_along_axis(func1d, axis, arr, *args, **kwargs): 
    """ 
    Like numpy.apply_along_axis(), but takes advantage of multiple 
    cores. 
    """   
    # Effective axis where apply_along_axis() will be applied by each 
    # worker (any non-zero axis number would work, so as to allow the use 
    # of `np.array_split()`, which is only done on axis 0): 
    effective_axis = 1 if axis == 0 else axis 
    if effective_axis != axis: 
     arr = arr.swapaxes(axis, effective_axis) 

    # Chunks for the mapping (only a few chunks): 
    chunks = [(func1d, effective_axis, sub_arr, args, kwargs) 
       for sub_arr in np.array_split(arr, multiprocessing.cpu_count())] 

    pool = multiprocessing.Pool() 
    individual_results = pool.map(unpacking_apply_along_axis, chunks) 
    # Freeing the workers: 
    pool.close() 
    pool.join() 

    return np.concatenate(individual_results) 

其中Pool.map()正在应用的功能unpacking_apply_along_axis()是独立的,因为它应该(使得子过程可以导入的话),并且是一个简单的薄包装处理该事实Pool.map()只需要一个参数:

def unpacking_apply_along_axis((func1d, axis, arr, args, kwargs)): 
    """ 
    Like numpy.apply_along_axis(), but and with arguments in a tuple 
    instead. 

    This function is useful with multiprocessing.Pool().map(): (1) 
    map() only handles functions that take a single argument, and (2) 
    this function can generally be imported from a module, as required 
    by map(). 
    """ 
    return np.apply_along_axis(func1d, axis, arr, *args, **kwargs) 

在我的具体情况,这导致2个内核和超线程2倍的速度提升。接近4倍的因素会更好,但加速已经很不错了,只需几行代码,对于具有更多内核的机器(这很常见)应该会更好。也许有避免数据拷贝和使用共享内存的方法(可能通过multiprocessing module本身)?