2013-05-29 65 views
0

我发现openmp不支持while循环(或者至少不喜欢它们太多)。 也不喜欢'!='操作符。openmp - while循环文本文件读取和使用管道

我有这一点的代码。

int count = 1; 
#pragma omp parallel for 
    while (fgets(buff, BUFF_SIZE, f) != NULL) 
    { 
     len = strlen(buff); 
     int sequence_counter = segment_read(buff,len,count); 
     if (sequence_counter == 1) 
     { 
      count_of_reads++; 
      printf("\n Total No. of reads: %d \n",count_of_reads); 
     } 
    count++; 
    } 

任何线索,如何管理?我读了一个地方(包括stackoverflow的另一篇文章),我可以使用管道。那是什么 ?以及如何实施它?

+0

编译提供了一个链接到计算器后你看的,请。 – Shahbaz

+1

@Shahbaz,我想他可能是指该SO发布 http://stackoverflow.com/questions/8121077/fread-slow-performance-in-openmp-threads – 2013-05-30 08:29:47

+0

其实...这一个.. HTTP ://stackoverflow.com/questions/7532067/parallelize-while-loop-with-openmp 但这也是相关的! – Sid5427

回答

3

在OpenMP中实现“parallel while”的一种方法是使用创建任务的while循环。这是一个普遍的草图:

void foo() { 
    while(Foo* f = get_next_thing()) { 
#pragma omp task firstprivate(f) 
     bar(f); 
    } 
#pragma omp taskwait 
} 

对于遍历与fgets的具体情况,注意与fgets具有固有的顺序语义(它获得了“下一个”行),这样就需要启动任务之前调用。对于每个任务来说,对fgets返回的数据副本进行操作也很重要,这样对fgets的调用不会覆盖前一个任务所操作的缓冲区。

+0

这些任务可以调用函数吗?即调用函数的任务,在char数组上执行函数,并返回一个值? – Sid5427

+0

是的,任务可以调用函数。你必须小心的是确保没有两个并发任务互相干扰。例如,如果我忘记在我的例子中写下“firstprivate(f),那么当bar(f)开始实际启动时,f的值可能已经消失或者被下一次迭代覆盖 –

+3

'f'这是默认的firstprivate,'task'构造中的'firstprivate'子句是多余的。 –

1

首先,即使它非常接近,但openmp不会奇迹般地让你的代码并行。它适用于for,因为for具有它可以理解的上限和下限。 Openmp使用这些界限在不同线程之间划分工作。

while循环没有这种可能。

其次,你如何期望你的任务是并行化的?您正在从一个文件中读取数据,其中顺序访问可能会给您比并行访问更好的性能。您可能会并行segment_read(基于其实施)。

或者,您可能希望将文件读取与处理重叠。为此,您需要使用更多的低级函数,例如Unix的openread函数。然后,执行异步读取,即发送读取请求,处理最后一个读取块,然后等待读取请求完成。例如,搜索“linux异步io”以阅读更多内容。

使用管道可能实际上并没有多大帮助。这将取决于我不熟悉的许多管道内部结构。但是,如果您拥有足够大的内存,则可能还需要考虑先加载整个数据,然后再处理它。这样,加载数据的速度将尽可能快(按顺序),然​​后您可以并行处理它们。

10

这太糟糕了,人们很快选择最佳答案。这是我的答案。
首先,你应该把文件读入一个类似fread的缓冲区。这很快。如何做到这一点的例子可以在这里找到http://www.cplusplus.com/reference/cstdio/fread/

然后,你可以在OpenMP上并行操作缓冲区。我已经为你实施了大部分。以下是代码。你没有提供segment_read函数,所以我创建了一个虚拟函数。我使用了C++中的一些函数,比如std :: vector和std :: sort,但在纯C中也可以做更多的工作。

编辑︰ 我编辑此代码,并能够删除排序和关键部分。

g++ foo.cpp -o foo -fopenmp -O3

#include <stdio.h> 
#include <omp.h> 
#include <vector> 

using namespace std; 

int segment_read(char *buff, const int len, const int count) { 
    return 1; 
} 

void foo(char* buffer, size_t size) { 
    int count_of_reads = 0; 
    int count = 1; 
    std::vector<int> *posa; 
    int nthreads; 

    #pragma omp parallel 
    { 
     nthreads = omp_get_num_threads(); 
     const int ithread = omp_get_thread_num(); 
     #pragma omp single 
     { 
      posa = new vector<int>[nthreads]; 
      posa[0].push_back(0); 
     } 

     //get the number of lines and end of line position 
     #pragma omp for reduction(+: count) 
     for(int i=0; i<size; i++) { 
      if(buffer[i] == '\n') { //should add EOF as well to be safe 
       count++; 
       posa[ithread].push_back(i); 
      } 
     } 

     #pragma omp for  
     for(int i=1; i<count ;i++) {  
      const int len = posa[ithread][i] - posa[ithread][i-1]; 
      char* buff = &buffer[posa[ithread][i-1]]; 
      const int sequence_counter = segment_read(buff,len,i); 
      if (sequence_counter == 1) { 
       #pragma omp atomic 
       count_of_reads++; 
       printf("\n Total No. of reads: %d \n",count_of_reads); 
      } 

     } 
    } 
    delete[] posa; 
} 

int main() { 
    FILE * pFile; 
    long lSize; 
    char * buffer; 
    size_t result; 

    pFile = fopen ("myfile.txt" , "rb"); 
    if (pFile==NULL) {fputs ("File error",stderr); exit (1);} 

    // obtain file size: 
    fseek (pFile , 0 , SEEK_END); 
    lSize = ftell (pFile); 
    rewind (pFile); 

    // allocate memory to contain the whole file: 
    buffer = (char*) malloc (sizeof(char)*lSize); 
    if (buffer == NULL) {fputs ("Memory error",stderr); exit (2);} 

    // copy the file into the buffer: 
    result = fread (buffer,1,lSize,pFile); 
    if (result != lSize) {fputs ("Reading error",stderr); exit (3);} 

    /* the whole file is now loaded in the memory buffer. */ 
    foo(buffer, result); 
    // terminate 


    fclose (pFile); 
    free (buffer); 
    return 0; 
} 
+0

我绝对爱护这个答案。很抱歉,最好的答案是“大多数问题似乎让他们尽快解决”。 – Sid5427

+1

我编辑过代码(再次),线程的内部循环是错误的,我遇到的主要问题是线程随机进入,所以segment_read可能不会按顺序调用,这可能不是问题。消息是带有位置的变量是有序的,换句话说,posa [0]是最低位置的向量,posa [7](使用8个线程)是位置最高的向量,所以如果你需要为了您的位置g他们。最初,我使用sort()和一个关键部分来做到这一点,但最新的代码并不需要这样做。 – 2013-05-30 13:57:40

+0

多数民众赞成在我这个问题最主要的是,我希望线被顺序读取,但segment_read中的一些子句可能会提前终止该线程。这会有什么影响吗?我的整个想法是整个segment_read函数并行运行。我有一个8核心机器,所以你可以假设8个segment_reads在文件的8个不同行中运行。 – Sid5427