2012-12-14 40 views
3

我试图并行化我写的脚本。每个进程都需要进行计算并将数据存储到数组的特定部分(列表列表)。每个进程都在计算和存储它的数据,但我无法弄清楚如何从非根进程获取数据到根进程,以便它可以将数据打印到文件中。我创建了一个最小的工作示例我的脚本---这一个被设计为在2个核心只为简单运行:如何使用MPI在Python中的进程之间共享数据?

from mpi4py import MPI 
import pdb 
import os 

comm = MPI.COMM_WORLD 
size = comm.Get_size() 
rank = comm.Get_rank() 

# Declare the array that will store all the temp results 
temps = [[0 for x in xrange(5)] for x in xrange(4)] 

# Loop over all directories 
if rank==0: 
    counter = 0 
    for i in range(2): 
     for j in range(5): 
     temps[i][j] = counter 
    counter = counter + 1 

else: 
    counter = 20 
    for i in range(2,4): 
     for j in range(5): 
     temps[i][j] = counter 
     counter = counter + 1 

temps = comm.bcast(temps,root=0) 

if rank==0: 

    print temps 

我执行使用脚本:

mpiexec -n 2 python mne.py 

当的情况下完成,输出是:

[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [0, 0, 0, 0, 0], [0, 0, 0, 0, 0]] 

因此你可以看到数据共享不能按我的想法工作。有人可以告诉我正确的方法,让数据回到根进程吗?

回答

4

代码工作正常,只是没有做你想做的事情。

此行

temps = comm.bcast(temps,root=0) 

广播处理器0的temps变量的所有处理器(包括等级0),这当然给正上方的结果。你想使用gather(或allgather,如果你想所有的处理器有答案)。这看起来更多的东西是这样的:

from mpi4py import MPI 
import pdb 
import os 

comm = MPI.COMM_WORLD 
size = comm.Get_size() 
rank = comm.Get_rank() 

assert size == 2 

# Declare the array that will store all the temp results 
temps = [[0 for x in xrange(5)] for x in xrange(4)] 

# declare the array that holds the local results 
locals =[[0 for x in xrange(5)] for x in xrange(2)] 

# Loop over all directories 
if rank==0: 
    counter = 0 
    for i in range(2): 
     for j in range(5): 
     locals[i][j] = counter 
     counter = counter + 1 

else: 
    counter = 20 
    for i in range(2): 
     for j in range(5): 
     locals[i][j] = counter 
     counter = counter + 1 

temps = comm.gather(locals,temps,root=0) 

if rank==0: 
    print temps 

如果你真的想要做的就地收集,和你知道的(说),所有的真实数据将是比你初始化为零较大数据与,你可以使用简化操作,但是对于numpy数组来说更容易:

from mpi4py import MPI 
import numpy 

comm = MPI.COMM_WORLD 
size = comm.Get_size() 
rank = comm.Get_rank() 

assert size == 2 

# Declare the array that will store all the temp results 
temps = numpy.zeros((4,5)) 

# Loop over all directories 
if rank==0: 
    counter = 0 
    for i in range(2): 
     for j in range(5): 
     temps[i,j] = counter 
     counter = counter + 1 

else: 
    counter = 20 
    for i in range(2,4): 
     for j in range(5): 
     temps[i,j] = counter 
     counter = counter + 1 

comm.Allreduce(MPI.IN_PLACE,temps,op=MPI.MAX) 

if rank==0: 
    print temps 
+0

这样做,谢谢! – rks171

相关问题