0

我有一个Class,它在状态中存储一个大型的数组numpy。这导致multiprocessing.Pool变得非常缓慢。这里有一个MRE:状态为大型数组的多处理

from multiprocessing import Pool 
import numpy 
import time 
from tqdm import tqdm 

class MP(object): 
    def __init__(self, mat): 
     self.mat = mat 

    def foo(self, x): 
     time.sleep(1) 
     return x*x + self.mat.shape[0] 

    def bar(self, arr): 
     results = [] 
     with Pool() as p: 
      for x in tqdm(p.imap(self.foo, arr)): 
       results.append(x) 
     return results 

if __name__ == '__main__': 
    x = numpy.arange(8) 
    mat = numpy.random.random((1,1)) 
    h = MP(mat) 
    res = h.bar(x) 
    print(res) 

我已经有了CPU 4个内核,这意味着该代码应该(和不)运行约2秒钟。 (tqdm显示2秒为进度条,这个例子没有必要)。但是,在主程序中,如果我做mat = numpy.random.random((10000,10000)),则需要永久运行。我怀疑这是因为Pool正在为每个工作人员复制mat,但我不确定这是如何工作的,因为mat处于Class的状态,并且不直接参与imap调用。所以,我的问题是:

  1. 为什么会发生这种情况? (即,Pool如何在一个类中工作?它到底具体是什么?制作了哪些副本以及通过引用传递了什么?)
  2. 什么是可行的解决方法?

编辑:修改foo做出的mat使用,这更代表我真正的问题。

+0

你的主程序中有多大'x'? –

+1

在你的主程序中,你传递给'p.imap'的函数是否需要'MP'的方法,或者它可以是一个未绑定的函数? –

+0

@JeremyMcGibbon好点。我想我的例子并不能很好地表达我的真正问题。所以,函数确实需要是'MP'的方法,因为函数实际上是从'mat'中读取的。 – ved

回答

0

如果像你说mat不直接参与imap电话,我猜一般的MP状态未在imap调用中使用(如果是这样,下面的评论,我会删除这个答案) 。如果是这种情况,则应该将foo写为未绑定函数,而不是MP的方法。 mat现在被复制的原因是因为foo的每个执行都需要在self中传递,其中包含self.mat

下执行得很快,不管垫的大小:

from multiprocessing import Pool 
import numpy 
import time 
from tqdm import tqdm 


class MP(object): 

    def __init__(self, mat): 
     self.mat = mat 

    def bar(self, arr): 
     results = [] 
     with Pool() as p: 
      for x in tqdm(p.imap(foo, arr)): 
       results.append(x) 
     return results 

def foo(x): 
    time.sleep(1) 
    return x * x 

if __name__ == '__main__': 
    x = numpy.arange(8) 
    mat = numpy.random.random((10000, 10000)) 
    h = MP(mat) 
    res = h.bar(x) 
    print(res) 

如果foo实际上并需要传递MP,因为它实际上并不需要从mat阅读,那么有没有办法避免发送mat到每个处理器,而你的问题2没有答案,除了“你不能”。但希望我已经回答了你的问题1.