2013-04-12 17 views
11

我已经阅读了关于共享数组的所有问题,并且它对于简单数组似乎很简单,但我仍然试图让它为我的数组工作。使我的NumPy阵列在进程间共享

import numpy as np 
data=np.zeros(250,dtype='float32, (250000,2)float32') 

我试图试图以某种方式使mp.Array接受data这个转换为一个共享的阵列,我也曾尝试创建数组使用ctypes的是这样的:

import multiprocessing as mp 
data=mp.Array('c_float, (250000)c_float',250) 

我的唯一方法设法让我的代码工作不是将数据传递给函数,而是传递一个编码的字符串以进行解压缩/解码,然而这会以n个(字符串数量)进程被调用,这似乎是多余的。我期望的实现是基于将二进制字符串列表切片为x(进程数)并将此块dataindex分割到除data在本地进行修改之外的进程,因此上的问题如何生成共享,任何使用自定义(嵌套)numpy数组的示例都已经非常有帮助。

PS:这个问题是一个从Python multi-processing

+1

什么是'mp'? 'multiprocessing'模块? – delnan

+0

是的,我会将其添加到问题 –

回答

10

注意跟进,你可以先在复杂的D型数组:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32') 

,并把它视为同质的D型数组:

In [5]: data2 = data.view('float32') 

以后,将其转换回复杂的D型:

In [7]: data3 = data2.view('float32, (250000,2)float32') 

更改dtype是一个非常快速的操作;它不会影响底层数据,只有NumPy解释它的方式。所以改变dtype几乎没有成本。

所以你已经读过关于具有简单(同质)dtype的数组,可以很容易地用上面的技巧应用到你的复杂dtype。


下面的代码借鉴了J.F. Sebastian's answer, here的许多想法。

import numpy as np 
import multiprocessing as mp 
import contextlib 
import ctypes 
import struct 
import base64 


def decode(arg): 
    chunk, counter = arg 
    print len(chunk), counter 
    for x in chunk: 
     peak_counter = 0 
     data_buff = base64.b64decode(x) 
     buff_size = len(data_buff)/4 
     unpack_format = ">%dL" % buff_size 
     index = 0 
     for y in struct.unpack(unpack_format, data_buff): 
      buff1 = struct.pack("I", y) 
      buff2 = struct.unpack("f", buff1)[0] 
      with shared_arr.get_lock(): 
       data = tonumpyarray(shared_arr).view(
        [('f0', '<f4'), ('f1', '<f4', (250000, 2))]) 
       if (index % 2 == 0): 
        data[counter][1][peak_counter][0] = float(buff2) 
       else: 
        data[counter][1][peak_counter][1] = float(buff2) 
        peak_counter += 1 
      index += 1 
     counter += 1 


def pool_init(shared_arr_): 
    global shared_arr 
    shared_arr = shared_arr_ # must be inherited, not passed as an argument 


def tonumpyarray(mp_arr): 
    return np.frombuffer(mp_arr.get_obj()) 


def numpy_array(shared_arr, peaks): 
    """Fills the NumPy array 'data' with m/z-intensity values acquired 
    from b64 decoding and unpacking the binary string read from the 
    mzXML file, which is stored in the list 'peaks'. 

    The m/z values are assumed to be ordered without validating this 
    assumption. 

    Note: This function uses multi-processing 
    """ 
    processors = mp.cpu_count() 
    with contextlib.closing(mp.Pool(processes=processors, 
            initializer=pool_init, 
            initargs=(shared_arr,))) as pool: 
     chunk_size = int(len(peaks)/processors) 
     map_parameters = [] 
     for i in range(processors): 
      counter = i * chunk_size 
      # WARNING: I removed -1 from (i + 1)*chunk_size, since the right 
      # index is non-inclusive. 
      chunk = peaks[i*chunk_size : (i + 1)*chunk_size] 
      map_parameters.append((chunk, counter)) 
     pool.map(decode, map_parameters) 

if __name__ == '__main__': 
    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250) 
    peaks = ... 
    numpy_array(shared_arr, peaks) 

如果你能保证其执行任务

if (index % 2 == 0): 
    data[counter][1][peak_counter][0] = float(buff2) 
else: 
    data[counter][1][peak_counter][1] = float(buff2) 

从未各个工序竞争,以改变在同一位置的数据,那么我相信你其实可以放弃使用锁

with shared_arr.get_lock(): 

但我没有足够好地知道你的代码,所以为了安全起见,我包括了锁。

+0

由于我传递给函数的计数器(通过执行“i * chunk_size”来计算),他们保证不会访问相同的“范围”数据。我会在明天早上试试你的答案,很可能接受这个答案。 –

0
from multiprocessing import Process, Array 
import numpy as np 
import time 
import ctypes 

def fun(a): 
    a[0] = -a[0] 
    while 1: 
     time.sleep(2) 
     #print bytearray(a.get_obj()) 
     c=np.frombuffer(a.get_obj(),dtype=np.float32) 
     c.shape=3,3 
     print 'haha',c 


def main(): 
    a = np.random.rand(3,3).astype(np.float32) 
    a.shape=1*a.size 
    #a=np.array([[1,3,4],[4,5,6]]) 
    #b=bytearray(a) 
    h=Array(ctypes.c_float,a) 
    print "Originally,",h 

    # Create, start, and finish the child process 
    p = Process(target=fun, args=(h,)) 
    p.start() 
    #p.join() 
    a.shape=3,3 
    # Print out the changed values 
    print 'first',a 
    time.sleep(3) 
    #h[0]=h[0]+1 
    print 'main',np.frombuffer(h.get_obj(), dtype=np.float32) 



if __name__=="__main__": 
    main()