2017-09-12 42 views
0

我试图让Python多处理工作来加速我写的代码。代码如下所示:在多处理中使用共享数组来保存值

from multiprocessing import Array, Pool 
import numpy as np 
#setting up shared memory array 
global misfit 
misfit = Array('d', np.empty((dim1,dim2,dim3,dim4)).flat) 

#looping through some values 
for i in xrange(0,1): 
    #setting up pool 
    pool = Pool() 
    p = [pool.apply_async(self.testfunc,args=(somevals,j)) for j in xrange(0,1)] 
    pool.close() 
    pool.join() 

凡self.testfunc样子:

def testfunc(self,somevals,j): 
     #some calculations 
     for k in xrange(0,1): 
      #some calculations 
      for mn in xrange(0,1): 
       #some more calculations 
       #save results 
       result = i*j*k*mn # example 
       misfit[i*j*k*mn] = result 

我的问题是,当我运行这个没有被保存在共享阵列的价值观,它仍然空。我知道这可能与全局变量有关,但是在使用此准确设置的更简单的程序中,值将保存到数组中。整个程序中的数组也很大(4561920000值)。此外,如果我在池外调用此函数,它将起作用并保存值。

所以我的问题是我在做什么错在这里?我是否正确发送共享数组?

编辑:想我会在作品中添加代码:

from multiprocessing import Array, Pool 
from numpy import empty, sin 
from time import time 
import numpy as np 

def initarr(): 
    a = Array('d', empty((5, 50, 80)).flat) 
    return a 

def testfunc(i, j, k): 
    count = (i*50*80) + (j*80) + k 
    x = sin(k) 
    a[count] = x 
    y = np.fft.fft(np.exp(2j*np.pi*np.arange(50000)/50000)) 


def process(i): 
    start = time() 
    pool = Pool() 
    for j in xrange(0, 50): 
    p = [pool.apply_async(testfunc, args=(i, j, k)) for k in xrange(0, 80)] 
    pool.close() 
    pool.join() 
    print time() - start 


global a 
a = initarr() 

for i in xrange(0, 5): 
    process(i) 

回答

0

好了,所以从我们的IT部门的人的帮助下,我终于有了一个版本,这是工作的,所以任何人在未来查看这个问题,我会发布一个解决方案。我并没有真正使用堆栈溢出,所以很抱歉,如果回答我自己的问题是不好的礼节。

我们使用初始化函数得到了这个工作,但我们必须确保初始化函数在相同的文件(模块)中,作为由池运行的函数。因此一个模块(MISC)中,我们有:

**misc.py** 
def testfunc(self,somevals,j): 
    #some calculations 
    for k in xrange(0,len(krange)): 
     #some calculations 
     for mn in xrange(0,len(mnrange)): 
      #some more calculations 
      #save results 
      loc = (i*len(jrange)*len(krange)*len(mnrange))+ 
        (j*len(krange)*len(mnrange))+(k*len(mnrange))+mn 
      result = i*j*k*mn # example 
      misfit[loc] = result 

def initpool(a): 
    global misfit 
    misfit = a 

,并在主文件有:

**main.py** 
from multiprocessing import Array, Pool 
from misc import initpool, testfunc 
import numpy as np 

#setting up shared memory array 
misfit = Array('d', np.empty((dim1,dim2,dim3,dim4)).flat) 

#looping through some values 
for i in xrange(0,len(irange)): 
    #setting up pool 
    pool = Pool(initializer=initpool,initargs=(misfit,),processes=20) 
    p = [pool.apply_async(testfunc,args=(somevals,j)) for j in xrange(0,len(jrange))] 
    pool.close() 
    pool.join() 

print(misfit[0]) 

注意当我们最初设置的阵列,它必须被命名为同样的作为你在initpool中设置的变量,至少从我测试它的时候开始。

这可能不是最好的方式来做到这一点,但它的工作原理,并希望其他人可能会找到它的用处!