在使用多处理程序包(Amazon EC2上的Ubuntu 12.04上的numpy 1.7.0使用python 2.73)并行执行一些简单的基于numpy的矩阵代数计算时,出现系统错误(如下所示) 。我的代码对于较小的矩阵大小工作正常,但对较大的内存崩溃(具有足够的可用内存)使用多重处理运行子进程时出现系统错误
我使用的矩阵大小很大(我的代码对1000000x10浮点密集矩阵运行正常,但崩溃了1000000x500个 - 我顺便通过这些矩阵到/从子进程)。 10 vs 500是一个运行时参数,其他所有内容都保持不变(输入数据,其他运行时参数等)
我也尝试使用python3运行相同(移植)的代码 - 对于较大的矩阵子进程进入睡眠/空闲模式(而不是像Python 2.7中的崩溃),程序/子进程只是挂在那里无所事事。对于较小的矩阵,代码可以在python3下正常运行。
任何建议,将不胜感激(我运行的想法在这里)
错误消息:
Exception in thread Thread-5: Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run() File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task) SystemError: NULL result without error in PyObject_Call
的多重代码我使用:
def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses):
if len(listOfInputs) == 0:
return
# Add result queue to the list of argument tuples.
resultQueue = mp.Manager().Queue()
listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs]
# Create and initialize the pool of workers.
pool = mp.Pool(processes = nParallelProcesses)
pool.map(proc, listOfInputsNew)
# Run the processes.
pool.close()
pool.join()
# Return the results.
return [resultQueue.get() for i in range(len(listOfInputs))]
下面是“ proc“执行每个子进程。基本上,它使用numpy解决了许多线性方程组的系统(它在子过程内构造了所需的矩阵)并将结果作为另一个矩阵返回。再一次,它适用于一个运行时参数的较小值,但对于较大的参数会崩溃(或挂在python3中)。
def solveForLFV(param):
startTime = time.time()
(chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param
LFoutChunkSize = XY.shape[0]
nLFdim = LFVin.shape[1]
sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim))
LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim))
for LFVoutIndex in xrange(LFoutChunkSize):
LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex]
sumLFVinOuterProductLFVpurch[:, :] = 0.
LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize)
for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)):
LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :]
sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :])
LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :])
queue.put((chunkI, LFVoutChunk))
print 'solveForLFV: ', time.time() - startTime, 'sec'
sys.stdout.flush()
你可以分享proc函数的代码吗? – barracel 2013-03-01 22:10:33
刚刚做到了。我没有描述proc的论点 - 其中一些是矩阵,一些是列表清单,有些只是浮点/整数。 'queue'用于返回每个子进程的结果。 – Yevgeny 2013-03-01 22:40:05