2015-06-26 45 views
-2

我写了一个生产者/消费者程序使用mutexcondition。 它使用全球int来生产&消费值。 有1个消费者线程和多个生产者线程。消费者/生产者在两端等待

规则:

  1. 当值过小,则消费者将等待。

  2. 当价值太大时,生产者会等待。

我的问题是:

我们知道,消费者通常需要等待,但生产商依赖。
在我的例子中,他们都需要检查条件,并可能相互等待,这是一个很好的做法吗?
在我的后续实现中可能导致死锁吗?

代码:

// condition test, a producer/consumer program, 

#include <stdio.h> 
#include <pthread.h> 
#include <stdlib.h> 
#include <string.h> 
#include <errno.h> 

static int glob = 0; // global variable, shared by threads, 
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; 
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 

/** 
* increase once, with lock, it's producer, 
* 
* @param arg 
* {max, point} 
* where: 
*  max, max value that would increase to, 
*  point, is min value that would trigger consume, 
* 
* @return 
* 0, not changed, 
* >0, increased, 
* <0, error, 
*/ 
static void *inc(void *arg) { 
    int *args = (int *)arg; 
    int max = args[0]; 
    int point = args[1]; 

    int result; 
    int n = 0; 

    if((result = pthread_mutex_lock(&mtx)) != 0) { // lock 
     printf("error to get lock: %d\n", result); 
     pthread_exit(NULL); // terminate if error, 
    } else { 
     while(glob >= max) { 
      if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait 
       printf("failed to wait for condition: %d\n", result); 
       return (void *)-1; 
      } 
     } 

     // do jobs, 
     glob++; // this will be compiled into multiple lines in machine code, so it's not automic, 
     n = 1; 
     /* 
     printf("inc by 1, %d\n", glob); 
     fflush(stdout); 
     */ 

     if(glob >= point) { // condition signal 
      if((result = pthread_cond_signal(&cond)) !=0) { 
       printf("error to condition signal: %d\n", result); 
       return (void *)-1; 
      } else { 
       // printf("condition signal, from thread [%d], value: %d\n", (int)pthread_self(), glob); 
      } 
     } 

     if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock 
      printf("error to unlock: %d\n", result); 
      return (void *)-1; 
     } 
    } 

    return (void *)n; 
} 

// increase loop, 
static void *inc_loop(void *arg) { 
    int result; 
    while(1) { 
     if((result = (int)inc(arg)) < 0) { 
      return (void *)result; 
     } 
    } 
} 

/** 
* consumer, with lock, 
* 
* @param arg 
* {point, steps} 
* where: 
*  point, is min value that would trigger consume, 
*  steps, is the count each consume would take, 
* 
* @return 
* 0, not consumed, 
* >0, consumed, 
* <0, error, 
*/ 
static void *consume(void *arg) { 
    int *args = (int *)arg; 
    int point = args[0]; 
    int step = args[1]; 
    int result; 
    int n = 0; 

    if((result = pthread_mutex_lock(&mtx)) != 0) { // lock 
     printf("error to get lock: %d\n", result); 
     pthread_exit(NULL); // terminate if error, 
    } else { 
     while(glob < point) { 
      pthread_cond_broadcast(&cond); // broadcast 
      printf("broadcast, and sleep,\n"); 

      if((result = pthread_cond_wait(&cond, &mtx)) != 0) { // wait 
       printf("failed to wait for condition: %d\n", result); 
       return (void *)-1; 
      } 
     } 

     // do job 
     printf("going to perform consume: %d -> ", glob); 
     glob-=(glob>=step?step:glob); 
     printf("%d\n", glob); 
     n = 1; 

     if((result = pthread_mutex_unlock(&mtx)) != 0) { // unlock 
      printf("error to unlock: %d\n", result); 
     } 
    } 

    return (void *)n; 
} 

// condition test 
int condition_test(void *(*func_inc_loop) (void *), void *(*func_consume) (void *), int thread_count, int max, int point, int consume_count, int step) { 
    pthread_t threads[thread_count]; 

    int result, i; 
    int inc_args[] = { 
     max, // max value that would increase to, 
     point // min value that would trigger consume, 
    }; 

    // start threads 
    for(i=0; i<thread_count; i++) { 
     if((result = pthread_create(threads+i, NULL, func_inc_loop, inc_args)) != 0) { 
      printf("error create thread [%d]: %d\n", i, result); 
     } 
    } 

    int loops = 0; 
    int consume_args[] = { 
     point, // min point to trigger consume, 
     step // consume steps 
    }; 

    // begin consume loop, 
    while(loops < consume_count) { 
     if(func_consume(consume_args) > 0) { 
      loops++; 
     } 
    } 

    printf("\nDone.\n"); 

    return 0; 
} 

/** 
* command line: 
* ./a.out <[thread_count]> <[max]> <[point]> <[consume_count]> 
*/ 
int main(int argc, char *argv[]) { 
    int thread_count = 3; 
    int max = 1000; 
    int point = 100; 
    int consume_count = 10; // how many times consume execute, 
    int step = 200; // max count in each consume, 

    if(argc >= 2) { 
     thread_count = atoi(argv[1]); 
    } 
    if(argc >= 3) { 
     max = atoi(argv[2]); 
    } 
    if(argc >= 4) { 
     point = atoi(argv[3]); 
    } 
    if(argc >= 5) { 
     consume_count = atoi(argv[4]); 
    } 
    if(argc >= 6) { 
     step = atoi(argv[5]); 
    } 

    condition_test(&inc_loop, &consume, thread_count, max, point, consume_count, step); 

    return 0; 
} 

编译:

gcc -pthread xxx.c

执行:

./a.out

+0

为什么关闭.....? –

回答

1

实际上,您不应该使用互斥锁来解决生产者/消费者或读写器问题。它不一定会导致僵局,但可能导致生产者或消费者的饥饿。

我使用了类似的方法来编码读写器锁。

你可以检查出来: https://github.com/prathammalik/OS161/blob/master/kern/thread/synch.c

+0

只有使用互斥量,是​​饿死的,但是通过wait&notify阻止无终止循环的条件,会提高性能,是吗?无论如何,谢谢,我会稍后尝试'semphore'。 –