2014-04-17 153 views
1

我有下面的代码现在可以使用,但我不认为随着进程和发送数据的数量增长将会扩展。MPI发送和接收(多对多)

这里是我要去:

首先我有一个发送回路,每个处理器将消息发送到所有其他。 每个进程发送的消息长度将不同,但不是类型。

for (int i = 0; i < n_proc; ++i){ 
    if (i != my_rank){ 
      int N = Xcoord_top[my_rank].size(); 
      MPI_Send(&Xcoord_top[my_rank][0], N, MPI_DOUBLE, i, 1000, MPI_COMM_WORLD); 
    } 
} 
MPI_Barrier(MPI_COMM_WORLD); 

我发送的消息后,我收到他们使用类似的循环

for (int i = 0; i < n_proc; ++i){ 
    if (i != my_rank){ 
     std::vector<double> temp(max_n); 
     MPI_Recv(&temp[0], points_per_proc[i], MPI_DOUBLE, 
        MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); 
     ... 
    } 
} 

在第二循环中我也有把信息在正确的地方基于标签几行,消息来源

这只有当我把环路之间的障碍,否则崩溃。

根据我的理解,消息的MPI内部存储可能会溢出(我不确定是否使用正确的术语)。所以在这种情况下,程序会在第一个循环中挂起。

任何想法我应该怎么做呢?

回答

2

有点过分代码以适应评论:

我建议只是在做这个作为一个单一的MPI_Allgatherv()

std::vector<int> disps(n_proc); 

disps[0] = 0; 
for (int i=1; i<n_proc; i++) 
    disps[i] = disps[i-1] + points_per_proc[i-1]; 

int totdata = disps[n_proc-1] + points_per_proc[n_proc-1]; 
std::vector<double> temp(totdata); 

MPI_Allgatherv(&Xcoord_top[my_rank][0], Xcoord_top[my_rank].size(), 
       MPI_Double, temp, points_per_proc, disps, MPI_DOUBLE, 
       MPI_COMM_WORLD); 

现在的数据PROC itemp[disps[i]]...temp[disps[i+1]-1]

有至少三个问题最初发布的代码:

  • 它很可能死锁(发送允许阻塞,直到收到) - 这可能是固定使用异步发送,例如MPI_Isend()具有以下MPI_Waitall()而不是MPI_Send();
  • 它几乎肯定会处理收到的乱序(不能保证它在从第i个处理器接收到的第i次迭代中),所以消息长度可能是错误的,从而导致错误,从而中止程序 - 可以通过修复来源为i而不是MPI_ANY_SOURCE来修复;和
  • 这是效率低下,使用线性点对点发送和接收,而不是像广播或聚集优化的集体 - 可以通过使用集合,如allgather,如上所述修复。
+0

感谢它就这么简单! 只用于记录disps应该被计算为 disps [i] = disps [i-1] + points_per_proc [i-1];'和proc'i'的数据应该在'temp [disps [i ]] ... temp [disps [i + 1] -1]' – giorgk

+0

非常正确,我会相应地更正答案。 –