2016-02-17 23 views
1

最近,我开始研究pthreads并试图用pthreads实现软件流水线操作。要做到这一点,我自己写了一个玩具程序,其中一个类似的程序将成为我主要项目的一部分。使用pthreads进行简单流水线操作

因此,在这个程序中的主线程创建和输入输出缓冲器整数类型,然后创建单个主线程并传递给主线程这些缓冲区。 主线程依次创建两个工作线程

输入和从所述主传递给主线程输出缓冲器是大小Ñ X ķ(例如尺寸为int 5×10)的。 主线程针对n(即5)次重复遍历大小为k(即10)的块。 在主线程k(这里是5)的循环运行次数。在ķ每次迭代主线程确实大小Ñ的输入数据的一部分的一些操作,并将其放置在主工作线程之间共享的共同缓冲主线程然后通知工作线程数据已被放置在公共缓冲区中。

两个工作线程等待来自主线程信号如果公共缓冲器已准备就绪。 公用缓冲区上的操作分为工作线程中的一半。这意味着一个工作线程将在上半年工作,而另一个工作线程将工作在的下一半通用缓冲区。 一旦工作线程主线程得到信号,每个工作线程的做他们的一半的数据的一些操作,并将其复制到输出缓冲。然后工作线程通过设置标志值来通知主线程它们的操作在公用缓冲器上完成。为工作线程创建了一组标志。该主线程不断,如果所有的标志都设置这基本上意味着所有工作线程说完就公共缓冲区他们的操作等主线程可以将下一数据块到公共缓冲区检查安全地为工作线程的消耗。

所以基本上有以流水线的方式在工作线程之间的通信。最后,我在主线程中输出输出缓冲区。但是我根本没有输出。我有复制粘贴我的代码几乎所有步骤的完整评论。

#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h> 
#include <sys/types.h> 
#include <sys/time.h> 
#include <semaphore.h> 
#include <unistd.h> 
#include <stdbool.h> 
#include <string.h> 

#define MthNum 1 //Number of Master threads 
#define WthNum 2 //Number of Worker threads 
#define times 5 // Number of times the iteration (n in the explanation) 
#define elNum 10 //Chunk size during each iteration (k in the explanation) 

pthread_mutex_t mutex; // mutex variable declaration 
pthread_cond_t cond_var; //conditional variarble declaration 
bool completion_flag = true; //This global flag indicates the completion of the worker thread. Turned false once all operation ends 
          //marking the completion 
int *commonBuff; //common buffer between master and worker threads 
int *commFlags; //array of flags that are turned to 1 by each worker threads. So worker thread i turns commFlags[i] to 1 
       // the master thread turns commFlags[i] = 0 for i =0 to (WthNum - 1) 
int *commFlags_s; 
int counter; // This counter used my master thread to count if all the commFlags[i] that shows 
      //all the threads finished their work on the common buffer 
// static pthread_barrier_t barrier; 
// Arguments structure passed to master thread 
typedef struct{ 
    int *input; // input buffer 
    int *output;// output buffer 
}master_args; 

// Arguments structure passed to worker thread 
typedef struct{ 
    int threadId; 
    int *outBuff; 
}worker_args; 

void* worker_func(void *arguments); 
void *master_func(void *); 

int main(int argc,char*argv[]){ 

    int *ipData,*opData; 
    int i,j; 

    // allocation of input buffer and initializing to 0 
    ipData = (int *)malloc(times*elNum*sizeof(int)); 
    memset(ipData,0,times*elNum*sizeof(int)); 

    // allocation of output buffer and initializing to 0 
    opData = (int *)malloc(times*elNum*sizeof(int)); 
    memset(opData,0,times*elNum*sizeof(int)); 

    pthread_t thread[MthNum]; 
    master_args* args[MthNum]; 


    //creating the single master thread and passing the arguments 
    for(i=0;i<MthNum;i++){ 
     args[i] = (master_args *)malloc(sizeof(master_args)); 
     args[i]->input= ipData; 
     args[i]->output= opData; 
     pthread_create(&thread[i],NULL,master_func,(void *)args[i]); 
    } 

    //joining the master thred 
    for(i=0;i<MthNum;i++){ 
     pthread_join(thread[i],NULL); 
    } 

    //printing the output buffer values 
    for(j =0;j<times;j++){ 
     for(i =0;i<elNum;i++){ 
      printf("%d\t",opData[i+j*times]); 
     } 
     printf("\n"); 
    } 

    return 0; 
} 

//This is the master thread function 
void *master_func(void *arguments){ 

    //copying the arguments pointer to local variables 
    master_args* localMasterArgs = (master_args *)arguments; 
    int *indataArgs = localMasterArgs->input; //input buffer 
    int *outdataArgs = localMasterArgs->output; //output buffer 

    //worker thread declaration 
    pthread_t Workers[WthNum]; 
    //worker thread arguments declaration 
    worker_args* wArguments[WthNum]; 
    int i,j; 

    pthread_mutex_init(&mutex, NULL); 
    pthread_cond_init (&cond_var, NULL); 
    counter =0; 

    commonBuff = (int *)malloc(elNum*sizeof(int)); 

    commFlags = (int *)malloc(WthNum*sizeof(int)); 
    memset(commFlags,0,WthNum*sizeof(int)); 
    commFlags_s= (int *)malloc(WthNum*sizeof(int)); 
    memset(commFlags_s,0,WthNum*sizeof(int)); 

    for(i =0;i<WthNum;i++){ 

     wArguments[i] = (worker_args*)malloc(sizeof(worker_args)); 
     wArguments[i]->threadId = i; 
     wArguments[i]->outBuff = outdataArgs; 

     pthread_create(&Workers[i],NULL,worker_func,(void *)wArguments[i]); 
    } 

    for (i = 0; i < times; i++) { 
     for (j = 0; j < elNum; j++) 
      indataArgs[i + j * elNum] = i + j; 

     while (counter != 0) { 
      counter = 0; 

      pthread_mutex_lock(&mutex); 
      for (j = 0; j < WthNum; j++) { 
       counter += commFlags_s[j]; 
      } 
      pthread_mutex_unlock(&mutex); 

     } 
     pthread_mutex_lock(&mutex); 
     memcpy(commonBuff, &indataArgs[i * elNum], sizeof(int)); 
     pthread_mutex_unlock(&mutex); 
     counter = 1; 
     while (counter != 0) { 
      counter = 0; 

      pthread_mutex_lock(&mutex); 
      for (j = 0; j < WthNum; j++) { 
       counter += commFlags[j]; 
      } 
      pthread_mutex_unlock(&mutex); 


     } 
     // printf("master broad cast\n"); 
     pthread_mutex_lock(&mutex); 
     pthread_cond_broadcast(&cond_var); 
     //releasing the lock 
     pthread_mutex_unlock(&mutex); 

    } 

    pthread_mutex_lock(&mutex); 
    completion_flag = false; 
    pthread_mutex_unlock(&mutex); 

    for (i = 0; i < WthNum; i++) { 
     pthread_join(Workers[i], NULL); 
    } 

    pthread_mutex_destroy(&mutex); 
    pthread_cond_destroy(&cond_var); 

    return NULL; 
} 


void* worker_func(void *arguments){ 

    worker_args* localArgs = (worker_args*)arguments; 

    //copying the thread ID and the output buffer 
    int tid = localArgs->threadId; 
    int *localopBuffer = localArgs->outBuff; 
    int i,j; 
    bool local_completion_flag=false; 

    while(local_completion_flag){ 

     pthread_mutex_lock(&mutex); 
     commFlags[tid] =0; 
     commFlags_s[tid] =1; 
     pthread_cond_wait(&cond_var,&mutex); 
     commFlags_s[tid] =0; 
     commFlags[tid] =1; 
     if (tid == 0) { 
      for (i = 0; i < (elNum/2); i++) { 
       localopBuffer[i] = commonBuff[i] * 5; 
      } 
     } else { // Thread ID 1 operating on the other half of the common buffer data and placing on the 
       // output buffer 
      for (i = 0; i < (elNum/2); i++) { 
       localopBuffer[elNum/2 + i] = commonBuff[elNum/2 + i] * 10; 
      } 
     } 
     local_completion_flag=completion_flag; 
     pthread_mutex_unlock(&mutex);//releasing the lock 

    } 

    return NULL; 
} 

但我不知道在哪里我在我的实现,因为逻辑上似乎是正确的做了错事。但是我的执行过程肯定有问题。我花了很长时间尝试不同的事情来解决它,但没有任何工作。对不起,这篇文章很长,但是我无法确定哪一部分我可能做错了,所以我无法简化这篇文章。因此,如果任何人都可以看看问题和实施情况,并且可以建议需要做什么修改才能按预期运行,那么这将会非常有帮助。感谢您的帮助和帮助。

+0

是'counter'应该由互斥体进行保护或不?似乎有很多情况下,如果您不通过广播c.v来修改'counter'而不保留mutex或将'counter'设置为零。 - 这些事情可能导致等待已经发生的事情。 –

+0

此外,我建议等待6的声誉量子时间线程睡30英寸的使用例如18 – dewelloper

+0

@DavidSchwartz抱歉,我迟到的答复。计数器将计算工作线程将打开的标志数量。主线程测量工作线程变为1的标志并增加计数器值。所以如果计数器值等于线程数,那么while循环会退出并继续前进,所以计数器不会被工作线程修改。该计数器仅由主线程修改。它将值传递给工作线程,因为等待计数器值的工作线程为0.我不确定我是否需要互斥量 – duttasankha

回答

0

这段代码有几处错误。

  1. 你可以从固定的工作线程的创建开始:

    wArguments[i] = (worker_args*)malloc(sizeof(worker_args)); 
    wArguments[i]->threadId = i; 
    wArguments[i]->outBuff = outdataArgs; 
    
    pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments); 
    

要初始化worker_args结构,但不正确的 - 传递指针数组(void *)wArguments,而不是指向你只是初始化数组元素。

void *master_func(void *arguments) 
{ 
/* (...) */ 
pthread_mutex_init(&mutex, NULL); 
pthread_cond_init (&cond_var, NULL); 
counter = WthNum; 
  • 当开始主线程,则错误地传递指针指针:

    pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments[i]); 
    //                ^^^ 
    
    开始使用它的值线程之前
  • 初始化计数器

    pthread_create(&thread[i],NULL,master_func,(void *)&args[i]); 
    
  • 请更改为:

    pthread_create(&thread[i],NULL,master_func,(void *) args[i]); 
    
  • 所有访问counter变量(如任何其他共享存储器)必须在线程之间同步。
  • +0

    你好 谢谢你的回复。我包括了这些变化,但该计划仍然没有按预期运行。我认为该计划正在无限循环中等待,而且还没有终止。我是否需要使用互斥锁初始化计数器 – duttasankha

    +0

    @duttasankha是的,当然 - 访问共享变量必须在所有执行路径之间同步 – 4pie0

    +0

    我将计数器置于互斥锁中。但它仍然不起作用。 – duttasankha