2011-04-07 20 views
11

我必须解决一个小问题。我有4个从属进程,每个进程都希望发送一个2d子数组(CHUNK_ROWS X CHUNK_COLUMNS)给主机0.主机0收集所有在ddd [ROWS] [COLUMNS]中的数据块并打印出来。我想用MPI_Gather()MPI_Type_create_subarray和MPI_Gather

#include <mpi.h> 
#include <iostream> 
using namespace std; 

#define ROWS 10 
#define COLUMNS 10 
#define CHUNK_ROWS 5 
#define CHUNK_COLUMNS 5 
#define TAG 0 

int** alloca_matrice(int righe, int colonne) 
{ 
int** matrice=NULL; 
int i; 

matrice = (int **)malloc(righe * sizeof(int*)); 

if(matrice != NULL){ 
    matrice[0] = (int *)malloc(righe*colonne*sizeof(int)); 
    if(matrice[0]!=NULL) 
    for(i=1; i<righe; i++) 
     matrice[i] = matrice[0]+i*colonne; 
    else{ 
    free(matrice); 
    matrice = NULL; 
    } 
} 
else{ 
    matrice = NULL; 
} 
return matrice; 

} 

int main(int argc, char* argv[]) 
{ 

int my_id, numprocs,length,i,j; 
int ndims, sizes[2],subsizes[2],starts[2]; 
int** DEBUG_CH=NULL; 
int** ddd=NULL; 
char name[BUFSIZ]; 
MPI_Datatype subarray=NULL; 
//MPI_Status status; 
MPI_Init(&argc, &argv) ;  
MPI_Comm_rank(MPI_COMM_WORLD, &my_id) ; 
MPI_Comm_size(MPI_COMM_WORLD, &numprocs) ; // Ottiene quanti processi sono attivi 
MPI_Get_processor_name(name, &length);  

if(my_id!=0){ 
    //creo una sottomatrice ripulita dalle ghost cells 
    ndims=2; 
    sizes[0] = CHUNK_ROWS+2; 
    sizes[1] = CHUNK_COLUMNS+2; 
    subsizes[0] = CHUNK_ROWS; 
    subsizes[1] = CHUNK_COLUMNS; 
    starts[0] = 1; 
    starts[1] = 1; 
    MPI_Type_create_subarray(ndims,sizes,subsizes,starts,MPI_ORDER_C,MPI_INT,&subarray); 
    MPI_Type_commit(&subarray); 

    DEBUG_CH = alloca_matrice(CHUNK_ROWS+2,CHUNK_COLUMNS+2); 
    for(i=0; i<CHUNK_ROWS+2; i++){ 
    for(j=0; j<CHUNK_COLUMNS+2; j++){ 
     if(i==0 || i==CHUNK_ROWS+1 || j==0 || j==CHUNK_COLUMNS+1) 
      DEBUG_CH[i][j] = 5; 
     else 
      DEBUG_CH[i][j] = 1; 
    } 
    } 
//MPI_Send(DEBUG_CH[0],1,subarray,0,TAG,MPI_COMM_WORLD); 
} 
if(my_id==0){ 
ddd = alloca_matrice(ROWS,COLUMNS); 
} 

MPI_Gather(DEBUG_CH[0],1,subarray,ddd[0],CHUNK_ROWS*CHUNK_COLUMNS,MPI_INT,0,MPI_COMM_WORLD); 
if(!my_id){ 
    for(i=0; i<ROWS; i++){ 
    for(j=0; j<COLUMNS; j++){ 
     printf("%d ",ddd[i][j]); 
    } 
    printf("\n"); 
    } 
} 

if(my_id) 
MPI_Type_free(&subarray); 

MPI_Finalize();        // Chiusura di MPI. 
return 0; 
} 

谢谢大家。

回答

20

所以这有点微妙,需要对Gather集合如何放置复杂类型有所了解。

如果你看最多examples of MPI_Gather,它们是一维数组,它很容易解释应该发生什么;你会从每个进程中获得(比如说)10个整数,而且Gather很聪明,可以从开始的0级开始放入10个整数,在阵列的10-19个开始放置10个整数,依此类推。

虽然这样更复杂的布局稍微复杂一点。首先,从发送者的角度来看,数据布局与接收者的数据布局不同。从发送者的角度来看,您从数组元素[1][2]开始,转到[1][5](数组大小为7x7),然后跳转到数组元素[2][3] - [2][5]等。 有CHUNK_ROWS个数据块,每个数据块之间用2个整数。

现在考虑接收者如何接收它们。假设它正在接收0级的数据。它将接收到数组元素[0][0]-[0][4] - 迄今为止非常好;但是接下来它将接收到大小为10x10的数组[1][0]-[1][4]中的下一个数据块。这是跳过5个元素。内存中的布局不同。因此,接收器将不得不接收不同的Subarray类型,然后发送者从中发送,因为存储器布局是不同的。

因此,尽管你可以从一些可以发送看起来像这样:

sizes[0] = CHUNK_ROWS+2; 
    sizes[1] = CHUNK_COLUMNS+2; 
    subsizes[0] = CHUNK_ROWS; 
    subsizes[1] = CHUNK_COLUMNS; 
    starts[0] = 1; 
    starts[1] = 1; 
    MPI_Type_create_subarray(ndims,sizes,subsizes,starts,MPI_ORDER_C,MPI_INT,&sendsubarray); 
    MPI_Type_commit(&sendsubarray); 

您将会收到到的东西,看起来像这样:

sizes[0] = ROWS; sizes[1] = COLUMNS; 
    subsizes[0] = CHUNK_ROWS; subsizes[1] = CHUNK_COLUMNS; 
    starts[0] = 0; starts[1] = 0; 
    MPI_Type_create_subarray(ndims,sizes,subsizes,starts,MPI_ORDER_C,MPI_INT,&recvsubarray); 
    MPI_Type_commit(&recvsubarray); 

最重要的是,注意到在区别sizes数组。

现在我们正在接近一点。请注意您的MPI_Gather线变为这样:

MPI_Gather(DEBUG_CH[0],1,sendsubarray,recvptr,1,recvsubarray,0,MPI_COMM_WORLD); 

有几件事情没有工作对以前的版本,MPI_Gather(DEBUG_CH[0],1,subarray,ddd[0],CHUNK_ROWS*CHUNK_COLUMNS,MPI_INT,0,MPI_COMM_WORLD); - 首先,请注意,如果引用ddd[0],但除了每个等级等级0,ddd=NULL,所以这将失败。因此创建一个名为say recvptr的新变量,并在等级零中将其设置为ddd[0]。 (其他进程认为它并不重要,因为他们没有收到。)另外,我认为你不想接收CHUNK_ROWS*CHUNK_COLUMSMPI_INTs,因为这会将它们连续放置在内存中,我的理解是你希望它们像从属任务一样排列,但是在更大的阵列中。

好的,现在我们正在某处,但上述仍然将无法​​正常工作,一个有趣的原因。对于1d阵列示例,很容易计算出第n个排序数据的位置。计算的方式是找到正在收到的数据的范围,并在该值后面开始下一个元素。但这在这里不起作用。 “就在”等级0的数据末尾不是排名第一的数据应该开始的地方([0][5]),而是[4][5] - 排名0s子阵列中的最后一个元素之后的元素。在这里,你从不同级别收集的数据重叠!所以我们将不得不摆弄数据类型的范围,并手动指定每个等级数据的起始位置。第二个是容易的部分;当您需要手动指定每个处理器的数据量或其位置时,可以使用MPI_Gatherv函数。首先是棘手的部分。

MPI让我们来指定给定数据类型的下限和上限 - 在给定一段内存的情况下,此类型的第一位数据将会出现,以及“结束”的位置,这只是意味着下一个可以开始的地方。 (数据可以延伸超过类型的上限,我认为这些名称使得这些名称具有误导性,但事实就是这样。)您可以指定这是任何您喜欢的方式,以方便您;由于我们将处理数组int中的元素,因此让我们将我们的MPI_INT类型的大小设置为大小。 (注意,我们只需要为接收的类型做这件事;从发送类型来看,因为我们只发送其中之一,所以没关系)。

现在,我们将使用gatherv指定每个元素的起始位置 - 以这个新调整大小类型的“大小”为单位,该大小仅为1整数。因此,如果我们想要在[0][5]处进入大阵列,大阵列的起始位移为5;如果我们希望它在位置[5][5]处移动,则位移为55.

最后,请注意,收集和分散集体都假定即使“主”也参与。如果连主人都有他们自己的全局数组,那么最简单的方法就是让它工作。

因此,既然,我以下工作:

#include <mpi.h> 
#include <iostream> 
#include <cstdlib> 
using namespace std; 

#define ROWS 10 
#define COLUMNS 10 
#define CHUNK_ROWS 5 
#define CHUNK_COLUMNS 5 
#define TAG 0 

int** alloca_matrice(int righe, int colonne) 
{ 
    int** matrice=NULL; 
    int i; 

    matrice = (int **)malloc(righe * sizeof(int*)); 

    if(matrice != NULL){ 
     matrice[0] = (int *)malloc(righe*colonne*sizeof(int)); 
     if(matrice[0]!=NULL) 
      for(i=1; i<righe; i++) 
       matrice[i] = matrice[0]+i*colonne; 
     else{ 
      free(matrice); 
      matrice = NULL; 
     } 
    } 
    else{ 
     matrice = NULL; 
    } 
    return matrice; 

} 

int main(int argc, char* argv[]) 
{ 

    int my_id, numprocs,length,i,j; 
    int ndims, sizes[2],subsizes[2],starts[2]; 
    int** DEBUG_CH=NULL; 
    int** ddd=NULL; 
    int *recvptr=NULL; 
    char name[BUFSIZ]; 
    MPI_Datatype sendsubarray; 
    MPI_Datatype recvsubarray; 
    MPI_Datatype resizedrecvsubarray; 
    //MPI_Status status; 
    MPI_Init(&argc, &argv) ;  
    MPI_Comm_rank(MPI_COMM_WORLD, &my_id) ; 
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs) ; // Ottiene quanti processi sono attivi 
    if (numprocs != 4) { 
     MPI_Abort(MPI_COMM_WORLD,1); 
    } 
    MPI_Get_processor_name(name, &length);  

    //creo una sottomatrice ripulita dalle ghost cells 
    ndims=2; 
    sizes[0] = CHUNK_ROWS+2; 
    sizes[1] = CHUNK_COLUMNS+2; 
    subsizes[0] = CHUNK_ROWS; 
    subsizes[1] = CHUNK_COLUMNS; 
    starts[0] = 1; 
    starts[1] = 1; 
    MPI_Type_create_subarray(ndims,sizes,subsizes,starts,MPI_ORDER_C,MPI_INT,&sendsubarray); 
    MPI_Type_commit(&sendsubarray); 

    DEBUG_CH = alloca_matrice(CHUNK_ROWS+2,CHUNK_COLUMNS+2); 
    for(i=0; i<CHUNK_ROWS+2; i++){ 
     for(j=0; j<CHUNK_COLUMNS+2; j++){ 
      if(i==0 || i==CHUNK_ROWS+1 || j==0 || j==CHUNK_COLUMNS+1) 
       DEBUG_CH[i][j] = 5; 
      else 
       DEBUG_CH[i][j] = my_id; 
     } 
    } 

    recvptr=DEBUG_CH[0]; 
    if(my_id==0){ 
     ddd = alloca_matrice(ROWS,COLUMNS); 
     sizes[0] = ROWS; sizes[1] = COLUMNS; 
     subsizes[0] = CHUNK_ROWS; subsizes[1] = CHUNK_COLUMNS; 
     starts[0] = 0; starts[1] = 0; 
     MPI_Type_create_subarray(2,sizes,subsizes,starts,MPI_ORDER_C,MPI_INT,&recvsubarray); 
     MPI_Type_commit(&recvsubarray); 
     MPI_Type_create_resized(recvsubarray, 0, 1*sizeof(int), &resizedrecvsubarray); 
     MPI_Type_commit(&resizedrecvsubarray); 
     recvptr = ddd[0]; 
    } 

    int counts[5]={1,1,1,1}; 
    int disps[5] ={0,5,50,55}; 
    MPI_Gatherv(DEBUG_CH[0],1,sendsubarray,recvptr,counts,disps,resizedrecvsubarray,0,MPI_COMM_WORLD); 
    if(!my_id){ 
     for(i=0; i<ROWS; i++){ 
      for(j=0; j<COLUMNS; j++){ 
       printf("%d ",ddd[i][j]); 
      } 
      printf("\n"); 
     } 
    } 

    if(my_id == 0) { 
     MPI_Type_free(&resizedrecvsubarray); 
     MPI_Type_free(&recvsubarray); 
     free(ddd[0]); 
     free(ddd); 
    } else { 
     MPI_Type_free(&sendsubarray); 
     free(DEBUG_CH[0]); 
     free(DEBUG_CH); 
    } 

    MPI_Finalize();        // Chiusura di MPI. 
    return 0; 
} 
+0

哇。我想我不明白mpi_gather是如何工作的......非常感谢。我将打印您的答案并将其作为备忘录。 – Riff 2011-04-07 19:48:20