我有一个scipy.sparse.csr_matrix格式的大型稀疏矩阵X,我想用一个numpy数组W来利用并行性来乘这个。经过一些研究后,我发现我需要在多处理中使用Array,以避免在进程之间复制X和W(例如,从这里:How to combine Pool.map with Array (shared memory) in Python multiprocessing?和Is shared readonly data copied to different processes for Python multiprocessing?)。这是我的最新尝试如何平行scipy稀疏矩阵乘法
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
但是,输出结果如下所示:(4.431,0.165)表明并行版本比非并行乘法慢得多。
我相信在类似的情况下,当一个人将大数据复制到进程时会导致速度减慢,但这不是这种情况,因为我使用数组来存储共享变量(除非发生在numpy.frombuffer或何时创建一个csr_matrix,但后来我找不到直接共享csr_matrix的方法)。速度慢的另一个可能原因是每个过程返回每个矩阵乘法的大结果,但是我不确定是否有办法解决这个问题。
有人可以看到我要去哪里吗? 感谢您的帮助!
更新:我无法确定,但我认为在进程之间共享大量数据效率并不高,理想情况下我应该使用多线程(尽管全局解释器锁(GIL)非常困难)。解决这个问题的一个方法是使用Cython释放GIL(参见http://docs.cython.org/src/userguide/parallelism.html),尽管很多numpy函数需要通过GIL。
你有numpy/scipy链接到优化的多线程ATLAS构建?如果你这样做,你应该在使用np.dot时免费获得并行矩阵乘法。 –
我正在使用连接到numpy/scipy的多线程BLAS库(OpenBLAS),但我测试了X.dot(W)和numpy.dot(X,W)(后者对于稀疏X不起作用),但这不是并行化。 – Charanpal