2013-03-28 138 views
2

我正在尝试设置几个线程来处于等待状态,直到他们收到pthread_cond_broadcast()pthread播放然后等待?

完成作业后,我想线程返回到他们的等待状态。

我也想那个叫pthread_cond_broadcast()等待所有线程,然后再继续回到自己的等待状态的过程。在这种情况下,它是调用广播的main()函数。我试图做的板状的这个B(0做一个pthread_cond_wait()调用播出后。

void* Work::job(void* id) 
{ 
    int idx = (long)id; 

    while(1) 
    { 
     pthread_mutex_lock(&job_lock); 

     while(!jobs_complete) 
     { 
      // wait for main to broadcast 
      pthread_cond_wait(&can_work, &job_lock); 
      pthread_mutex_unlock(&job_lock); 

      // work here 

      pthread_mutex_lock(&job_lock); 
      ++jobs_completed; 

      if(jobs_completed == NUM_THREADS) 
      { 
       jobs_complete = true; 
       pthread_cond_signal(&jobs_done); 
       pthread_mutex_unlock(&job_lock); 
      } 
      pthread_mutex_unlock(&job_lock); 
     } 

     pthread_mutex_unlock(&job_lock); 
    } 

    return NULL; 
} 

NUM_THREADS是4,job_lockpthread_mutex_tcan_workjobs_donepthread_cond_tjobs_completedbooljobs_completeint

// work 

jobs_completed = false; 
jobs_complete = 0; 
pthread_mutex_lock(&job_lock); 
pthread_cond_broadcast(&can_work); 
pthread_cond_wait(&jobs_complete); 
pthread_mutex_unlock(&job_lock); 

// work that depends on jobs_complete 

现在,我正在通过调用pthread_cond_broadcast()然后pthread_cond_wait()后立即执行此操作,但这似乎是僵局。

谁能解释我应如何做这个或者我哪里错了?我会很感激任何帮助。

谢谢!

+0

它看起来* *喜欢你试图etup一堆线程给所有人等待“去”信号,然后它们集中完成NUM_THREAD个作业,理想情况下每个线程一个,并且一旦NUM_THREAD作业完成,返回等待另一个去信号。这是否准确?我已经看到不少于五个错误,但是想要在确定答案之前确切地知道你想要做什么。 – WhozCraig 2013-03-28 07:48:58

+0

@WhozCraig,是的,这是我正在尝试做的。我还希望调用“发送”信号的功能在继续之前等待所有作业完成。 – noko 2013-03-28 08:04:06

回答

1

你有你的函数结束,这将导致未定义行为3个可能的连续调用pthread_mutex_unlock。你其实不需要两个内在的。如果jobs_completetrue,该线程将退出循环,解除锁定,否则就会陷入循环,需要它的can_work条件等待。

此外,还有

pthread_cond_wait(&jobs_complete); 

你大概的意思:

pthread_cond_wait(&jobs_complete,&job_lock); 

此外,该功能需要一个pthread_cond_t *pthread_mutex_t *,不是int,所以即使那么代码是明显的打破。

请注意,条件变量上的信号或广播只会对等待变量的线程已有产生影响。信号不会保留以备将来等待。 因此,当线程在jobs_complete上循环时阻塞并再次等待,它们将不得不再次发信号以恢复工作。

另一件事:你所提到的类型的job_completeintjob_completedbool,但你的代码似乎并不同意:

 if(jobs_completed == NUM_THREADS) 
     { 
      jobs_complete = true; 

这里是我的建议:学习上的信号灯和阻隔抽象模型,如果可以的话,使用现有的实现方式(在C booststd ++ 11),或使用pthread API否则重新实现它们。这些将比操纵cond变量更容易帮助你处理这种情况。看看这个网站上的现有解决方案。例如this question涉及一个非常类似的问题,我提供的解决方案可以很容易地修改为使用pthread API来满足您的要求。

3

我只是张贴这(这几乎是所有的C代码,但这样是并行线程,所以有点松弛恳请)证明做我认为你想实现的一种方式。很明显,你会想要正确地将这些内容封装在适当的类中等等。希望能够告诉你的是,谓词管理和通知的条件变量,互斥锁以及它们与的关系是如何工作的。

我希望你觉得它有用。祝你有美好的一天。

#include <iostream> 
#include <unistd.h> 
#include <pthread.h> 
using namespace std; 

// our global condition variable and mutex 
pthread_cond_t cv = PTHREAD_COND_INITIALIZER; 
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; 

// our predicate values. 
bool finished = false; 
int jobs_waiting = 0; 
int jobs_completed = 0; 

// our thread proc 
static void *worker_proc(void* p) 
{ 
    intptr_t id = (intptr_t)p; // our id 
    size_t n_completed = 0;  // our job completion count 

    // always latch prior to eval'ing predicate vars. 
    pthread_mutex_lock(&mtx); 
    while (!finished) 
    { 
     // wait for finish or work-waiting predicate 
     while (!finished && jobs_waiting == 0) 
      pthread_cond_wait(&cv, &mtx); 

     // we own the mutex, so we're free to look at, modify 
     // etc. the values(s) that we're using for our predicate 
     if (finished) 
      break; 

     // must be a job_waiting, reduce that number by one, then 
     // unlock the mutex and start our work. Note that we're 
     // changing the predicate (jobs_waiting is part of it) and 
     // we therefore need to let anyone that is monitoring know. 
     --jobs_waiting; 
     pthread_cond_broadcast(&cv); 
     pthread_mutex_unlock(&mtx); 

     // DO WORK HERE (this just runs a lame summation) 
     for (int i=0,x=0;i<1048576; x += ++i); 
     ++n_completed; 

     // finished work latch mutex and setup changes 
     pthread_mutex_lock(&mtx); 
     ++jobs_completed; 
     pthread_cond_broadcast(&cv); 
    } 

    // final report 
    cout << id << ": jobs completed = " << n_completed << endl; 

    // we always exit owning the mutex, so unlock it now. but 
    // let anyone else know they should be quitting as well. 
    pthread_cond_broadcast(&cv); 
    pthread_mutex_unlock(&mtx); 
    return p; 
} 

// sets up a batch of work and waits for it to finish. 
void run_batch(int num) 
{ 
    pthread_mutex_lock(&mtx); 
    jobs_waiting = num; 
    jobs_completed = 0; 
    pthread_cond_broadcast(&cv); 

    // wait or all jobs to complete. 
    while (jobs_completed != num) 
     pthread_cond_wait(&cv, &mtx); 

    // we own this coming out, so let it go. 
    pthread_mutex_unlock(&mtx); 
} 

// main entry point. 
int main() 
{ 
    // number of threads in our crew 
    static const size_t N = 7; 
    pthread_t thrds[N] = {0}; 

    // startup thread crew. 
    intptr_t id = 0; 
    for (size_t i=0; i<N; ++i) 
     pthread_create(thrds + i, NULL, worker_proc, (void*)(++id)); 

    // run through batches. each batch is one larger 
    // than the prior batch. this should result in some 
    // interesting job-counts per-thread. 
    for (int i=0; i<64; ++i) 
     run_batch(i); 

    // flag for shutdown state. 
    pthread_mutex_lock(&mtx); 
    finished = true; 
    pthread_cond_broadcast(&cv); 
    pthread_mutex_unlock(&mtx); 
    for (size_t i=0; i<N; pthread_join(thrds[i++], NULL)); 

    return 0; 
} 

样本输出#1

3: jobs completed = 256 
6: jobs completed = 282 
5: jobs completed = 292 
2: jobs completed = 242 
1: jobs completed = 339 
4: jobs completed = 260 
7: jobs completed = 409 

样品输出#2

6: jobs completed = 882 
1: jobs completed = 210 
4: jobs completed = 179 
5: jobs completed = 178 
2: jobs completed = 187 
7: jobs completed = 186 
3: jobs completed = 194 

样本输出#3

1: jobs completed = 268 
6: jobs completed = 559 
3: jobs completed = 279 
5: jobs completed = 270 
2: jobs completed = 164 
4: jobs completed = 317 
7: jobs completed = 159 

固定批量

相同的代码,但改变这一点:

for (int i=0; i<64; ++i) 
    run_batch(i); 

这样:

for (int i=0; i<64; ++i) 
    run_batch(N); 

给出以下,这可能是更接近你真正想要什么。

样本输出#1

4: jobs completed = 65 
2: jobs completed = 63 
5: jobs completed = 66 
3: jobs completed = 63 
1: jobs completed = 64 
7: jobs completed = 63 
6: jobs completed = 64 

样本输出#2

3: jobs completed = 65 
5: jobs completed = 62 
1: jobs completed = 67 
7: jobs completed = 63 
2: jobs completed = 65 
6: jobs completed = 61 
4: jobs completed = 65 

样本输出#3

2: jobs completed = 58 
4: jobs completed = 61 
5: jobs completed = 69 
7: jobs completed = 68 
3: jobs completed = 61 
1: jobs completed = 64 
6: jobs completed = 67