2015-09-05 59 views
2

我有一个MPI程序,用于从包含文件名列表的文件中读取多个进程,并基于读取的文件名 - 它读取相应的文件并计数单词的频率。当一个MPI进程执行MPI_Barrier()时,其他进程挂起

如果其中一个进程完成并返回 - 阻止执行MPI_Barrier(),则其他进程也会挂起。在调试时,可以看到函数没有输入process_files() readFile()函数无法找出原因。请看以下代码:

#include <stdio.h> 
#include <stdlib.h> 
#include <mpi.h> 
#include <ctype.h> 
#include <string.h> 
#include "hash.h" 

void process_files(char*, int* , int, hashtable_t*); 

void initialize_word(char *c,int size) 
{ 
    int i; 
    for(i=0;i<size;i++) 
     c[i]=0; 

    return; 
} 



char* readFilesList(MPI_File fh, char* file,int rank, int nprocs, char* block, const int overlap, int* length) 
{ 
    char *text; 
    int blockstart,blockend; 

    MPI_Offset size; 
    MPI_Offset blocksize; 
    MPI_Offset begin; 
    MPI_Offset end; 
    MPI_Status status; 

    MPI_File_open(MPI_COMM_WORLD,file,MPI_MODE_RDONLY,MPI_INFO_NULL,&fh); 
    MPI_File_get_size(fh,&size); 

    /*Block size calculation*/ 
    blocksize = size/nprocs; 
    begin = rank*blocksize; 
    end = begin+blocksize-1; 

    end+=overlap; 

    if(rank==nprocs-1) 
     end = size; 

    blocksize = end-begin+1; 

    text = (char*)malloc((blocksize+1)*sizeof(char)); 
    MPI_File_read_at_all(fh,begin,text,blocksize,MPI_CHAR, &status); 
    text[blocksize+1]=0; 

    blockstart = 0; 
    blockend = blocksize; 

    if(rank!=0) 
    { 
     while(text[blockstart]!='\n' && blockstart!=blockend) blockstart++; 
     blockstart++; 
    } 

    if(rank!=nprocs-1) 
    { 

     blockend-=overlap; 
     while(text[blockend]!='\n'&& blockend!=blocksize) blockend++; 
    } 



    blocksize = blockend-blockstart; 

    block = (char*)malloc((blocksize+1)*sizeof(char)); 
    block = memcpy(block, text + blockstart, blocksize); 
    block[blocksize]=0; 
    *length = strlen(block); 

    MPI_File_close(&fh); 
    return block; 
} 

void calculate_term_frequencies(char* file, char* text, hashtable_t *hashtable,int rank) 
{ 
    printf("Start File %s, rank %d \n\n ",file,rank); 
    fflush(stdout); 
    if(strlen(text)!=0||strlen(file)!=0) 
    { 

     int i,j; 
     char w[100]; 
     i=0,j=0; 
     while(text[i]!=0) 
     { 
      if((text[i]>=65&&text[i]<=90)||(text[i]>=97&&text[i]<=122)) 
      { 
       w[j]=text[i]; 
       j++; i++; 
      } 

      else 
      { 

       w[j] = 0; 
       if(j!=0) 
       { 
        //ht_set(hashtable, strcat(strcat(w,"#"),file),1); 
       } 
       j=0; 
       i++; 
       initialize_word(w,100); 
      } 

     } 
    } 
    return; 
} 

void readFile(char* filename, hashtable_t *hashtable,int rank) 
{ 
    MPI_Status stat; 
    MPI_Offset size; 
    MPI_File fx; 
    char* textFromFile=0; 

    printf("Start File %d, rank %d \n\n ",strlen(filename),rank); 
    fflush(stdout); 

    if(strlen(filename)!=0) 
    { 
     MPI_File_open(MPI_COMM_WORLD,filename,MPI_MODE_RDONLY,MPI_INFO_NULL,&fx); 
     MPI_File_get_size(fx,&size); 

     printf("Start File %s, rank %d \n\n ",filename,rank); 
     fflush(stdout); 

     textFromFile = (char*)malloc((size+1)*sizeof(char)); 
     MPI_File_read_at_all(fx,0,textFromFile,size,MPI_CHAR, &stat); 
     textFromFile[size]=0; 
     calculate_term_frequencies(filename, textFromFile, hashtable,rank); 

     MPI_File_close(&fx); 

    } 

    printf("Done File %s, rank %d \n\n ",filename,rank); 
    fflush(stdout); 
    return; 
} 

void process_files(char* block, int* length, int rank,hashtable_t *hashtable) 
{ 

    char s[2]; 
    s[0] = '\n'; 
    s[1] = 0; 

    char *file; 
    if(*length!=0) 
    { 
     /* get the first file */ 
     file = strtok(block, s); 

     /* walk through other tokens */ 
     while(file != NULL) 
     { 
      readFile(file,hashtable,rank); 
      file = strtok(NULL, s); 
     } 
    } 
    return; 
} 

void execute_process(MPI_File fh, char* file, int rank, int nprocs, char* block, const int overlap, int * length, hashtable_t *hashtable) 
{ 

    block = readFilesList(fh,file,rank,nprocs,block,overlap,length); 
    process_files(block,length,rank,hashtable); 
} 


int main(int argc, char *argv[]){ 

    /*Initialization*/ 
    MPI_Init(&argc, &argv); 
    MPI_File fh=0; 
    int rank,nprocs,namelen; 
    char *block=0; 
    const int overlap = 70; 
    char* file = "filepaths.txt"; 
    int *length = (int*)malloc(sizeof(int)); 

    hashtable_t *hashtable = ht_create(65536); 

    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
    MPI_Comm_size(MPI_COMM_WORLD, &nprocs); 

    char processor_name[MPI_MAX_PROCESSOR_NAME]; 
    MPI_Get_processor_name(processor_name, &namelen); 
    printf("Rank %d is on processor %s\n",rank,processor_name); 
    fflush(stdout); 

    execute_process(fh,file,rank,nprocs,block,overlap,length,hashtable); 

    printf("Rank %d returned after processing\n",rank); 
    MPI_Barrier(MPI_COMM_WORLD); 

    MPI_Finalize(); 
    return 0; 

} 

的filepaths.txt是包含普通文本文件的绝对文件名的文件:

如:

/home/mpiuser/mpi/MPI_Codes/code/test1.txt 
/home/mpiuser/mpi/MPI_Codes/code/test2.txt 
/home/mpiuser/mpi/MPI_Codes/code/test3.txt 
+0

这个readFilesList看起来相当复杂,你确定它在那里生成正确的块大小吗?我认为你不会从并行化这部分代码中获得很多收益。读取单个文本文件(与实际数据相比,该文件相对较小,您希望从这些文件读取)在单个进程中更容易完成,甚至可能更快。因此,我会在单个进程中读取该列表并进行广播或分散结果列表的文件。 – haraldkl

+0

在我看来,随后你继续让每个进程读取其中一个文件,而不是所有的读取部分。如果是这种情况,您不要在这里使用MPI_IO! MPI_read_all操作要求所有进程参与此文件的调用。 – haraldkl

+1

readFilesList正在生成文件列表的非重叠块。但是,我会尝试单个进程的建议来读取它,并使用scatter将它们分配给进程。 – Dhanashree

回答

0

你readFilesList功能是非常令人迷惑,我相信它没有做你想做的事情,但也许我不明白它的正确性。我相信它应该从每个进程的列表文件中收集一大堆文件名。每个过程都有不同的设置。它并没有这样做,但这不是问题,即使这样做会达到你想要的效果,后续的MPI IO也无法工作。

读取文件时,使用MPI_File_read_all和MPI_COMM_WORLD作为通信器。这需要所有进程参与阅读这个文件。现在,如果每个进程都应该读取不同的文件,这显然不会起作用。

所以你的实现有几个问题,但我不能真正解释你描述的行为,我宁愿先开始并尝试修复它们,然后再详细调试,发生什么问题。

我的印象,你想有沿着这些线路的算法:

  • 读取文件名列表

  • 分发文件,这些文件列表同样给所有进程

  • 让每个进程在其自己的一组文件上工作

  • 使用来自thi的加工

而且我建议用下面的方法来试试这个:

  • 阅读单个进程的列表(无MPI IO)

  • 分散的文件列表到所有进程,以便所有工作都可以解决相同数量的工作

  • 让每个进程独立并以串行方式工作在其文件列表上(串行文件根据需要

我相信访问和处理)

  • 与MPI一些数据减少,这将是您的情况最好的(最简单和最快)的策略。请注意,根本不涉及MPI IO。我不认为在第一步中对文件列表进行一些复杂的分布式读取会导致这方面的任何优势,并且在实际的处理过程中它实际上是有害的。您的流程越独立,通常您的可扩展性越好。

  • +0

    我想我已经在一定程度上误解了MPI-IO,并将其用于完全独立的文件处理。我将readFiles()中使用MPI-IO的代码部分改为串行文件访问,并且工作正常。感谢您的详细解释。 – Dhanashree

    +0

    MPi-IO方法仍然可以与MPI_COMM_SELF一起使用。例如,您不再获得集体I/O优化,但您确实在程序和底层文件系统之间获得了抽象层 - 例如,您可能希望在Windows和Unix之间进行移植。 –