2013-02-27 23 views
8

在使用多处理程序包(Amazon EC2上的Ubuntu 12.04上的numpy 1.7.0使用python 2.73)并行执行一些简单的基于numpy的矩阵代数计算时,出现系统错误(如下所示) 。我的代码对于较小的矩阵大小工作正常,但对较大的内存崩溃(具有足够的可用内存)使用多重处理运行子进程时出现系统错误

我使用的矩阵大小很大(我的代码对1000000x10浮点密集矩阵运行正常,但崩溃了1000000x500个 - 我顺便通过这些矩阵到/从子进程)。 10 vs 500是一个运行时参数,其他所有内容都保持不变(输入数据,其他运行时参数等)

我也尝试使用python3运行相同(移植)的代码 - 对于较大的矩阵子进程进入睡眠/空闲模式(而不是像Python 2.7中的崩溃),程序/子进程只是挂在那里无所事事。对于较小的矩阵,代码可以在python3下正常运行。

任何建议,将不胜感激(我运行的想法在这里)

错误消息:

Exception in thread Thread-5: Traceback (most recent call last): 
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner 
    self.run() File "/usr/lib/python2.7/threading.py", line 504, in run 
    self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks 
    put(task) SystemError: NULL result without error in PyObject_Call 

的多重代码我使用:

def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses): 
    if len(listOfInputs) == 0: 
     return 
    # Add result queue to the list of argument tuples. 
    resultQueue = mp.Manager().Queue() 
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs] 
    # Create and initialize the pool of workers. 
    pool = mp.Pool(processes = nParallelProcesses) 
    pool.map(proc, listOfInputsNew) 
    # Run the processes. 
    pool.close() 
    pool.join() 
    # Return the results. 
    return [resultQueue.get() for i in range(len(listOfInputs))] 

下面是“ proc“执行每个子进程。基本上,它使用numpy解决了许多线性方程组的系统(它在子过程内构造了所需的矩阵)并将结果作为另一个矩阵返回。再一次,它适用于一个运行时参数的较小值,但对于较大的参数会崩溃(或挂在python3中)。

def solveForLFV(param): 
    startTime = time.time() 
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param 
    LFoutChunkSize = XY.shape[0] 
    nLFdim = LFVin.shape[1] 
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim)) 
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim)) 
    for LFVoutIndex in xrange(LFoutChunkSize): 
     LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex] 
     sumLFVinOuterProductLFVpurch[:, :] = 0. 
     LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize) 
     for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)): 
      LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :] 
      sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :]) 
     LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :]) 
    queue.put((chunkI, LFVoutChunk)) 
    print 'solveForLFV: ', time.time() - startTime, 'sec' 
    sys.stdout.flush() 
+0

你可以分享pr​​oc函数的代码吗? – barracel 2013-03-01 22:10:33

+0

刚刚做到了。我没有描述proc的论点 - 其中一些是矩阵,一些是列表清单,有些只是浮点/整数。 'queue'用于返回每个子进程的结果。 – Yevgeny 2013-03-01 22:40:05

回答

5

500,000,000非常大:如果你使用的是float64,那是40亿字节,或者大约4GB。 (10,000,000浮点数组将会是8000万字节,或者说大约80M字节 - 要小得多)。我期望这个问题与多处理试图挑选数组发送到管道上的子进程有关。

由于您位于unix平台上,因此可以通过利用fork()(用于创建多处理的工作人员)的内存继承行为来避免此行为。我的这个破解成功(从this project中删除),在评论中描述。

### A helper for letting the forked processes use data without pickling. 
_data_name_cands = (
    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10)) 
    for _ in itertools.count()) 
class ForkedData(object): 
    ''' 
    Class used to pass data to child processes in multiprocessing without 
    really pickling/unpickling it. Only works on POSIX. 

    Intended use: 
     - The master process makes the data somehow, and does e.g. 
      data = ForkedData(the_value) 
     - The master makes sure to keep a reference to the ForkedData object 
      until the children are all done with it, since the global reference 
      is deleted to avoid memory leaks when the ForkedData object dies. 
     - Master process constructs a multiprocessing.Pool *after* 
      the ForkedData construction, so that the forked processes 
      inherit the new global. 
     - Master calls e.g. pool.map with data as an argument. 
     - Child gets the real value through data.value, and uses it read-only. 
    ''' 
    # TODO: does data really need to be used read-only? don't think so... 
    # TODO: more flexible garbage collection options 
    def __init__(self, val): 
        g = globals() 
        self.name = next(n for n in _data_name_cands if n not in g) 
        g[self.name] = val 
        self.master_pid = os.getpid() 

    @property 
    def value(self): 
        return globals()[self.name] 

    def __del__(self): 
        if os.getpid() == self.master_pid: 
            del globals()[self.name] 
相关问题