2016-11-28 187 views
0

我正在研究一个问题,我正在实现一个模仿生产者 - 消费者范例的程序。当我只有一个生产者和一个消费者时,我使用的代码有效,但当我添加另一个生产者和另一个消费者时,它不起作用。C生产者 - 消费者使用PThreads

我花了一段时间在这个,似乎无法弄清楚为什么我得到错误Synchronization Error: Producer x Just overwrote x from Slot x。我通过各种测试跟踪了这个问题,问题在于生产者在注意到另一个生产者处于关键部分时没有被阻止。

#include <stdio.h> 
#include <pthread.h> 
#include <semaphore.h> 
#include <stdlib.h> 

void *producer (void *) ; 
void *consumer(void *) ; 
sem_t empty, full, mutex ; 

int buffer[10] /*note extra long space!*/ ; 
int ID[10] ; 
int in = 0 ; int out = 0 ; 
int BUFFER_SIZE = 10 ; 
int nextProduced = 0 ; 

main() { 
    int i ; 
    pthread_t TID[10] ; 

    sem_init(&empty, 0, 10) ; 
    sem_init(&full, 0, 0) ; 
    sem_init(&mutex, 0, 1) ; 

    for(i = 0; i < 10; i++) { 
     ID[i] = i ; 
     buffer[i] = -1 ; 
    } 

    //for(i = 0; i < 5000; i += 2) { 
     pthread_create(&TID[0], NULL, producer, (void *) &ID[0]) ; 
     printf("Producer ID = %d created!\n", 0) ; 
     pthread_create(&TID[1], NULL, consumer, (void *) &ID[1]) ; 
     printf("Consumer ID = %d created!\n", 1) ; 

     pthread_create(&TID[2], NULL, producer, (void *) &ID[2]) ; 
     printf("Producer ID = %d created!\n", 2) ; 
     pthread_create(&TID[3], NULL, consumer, (void *) &ID[3]) ; 
     printf("Consumer ID = %d created!\n", 3) ; 
    //} 

    for(i = 0; i < 10 ; i++) { 
     pthread_join(TID[i], NULL) ; 
    } 
} 

void *producer(void *Boo) { 
    int *ptr; 
    int ID; 
    ptr = (int *) Boo; 
    ID = *ptr; 
    while (1) { 
     nextProduced++; //Producing Integers 
     /* Check to see if Overwriting unread slot */ 
     sem_wait(&empty); 
     sem_wait(&mutex); 

     if (buffer[in] != -1) { 
      printf("Synchronization Error: Producer %d Just overwrote %d from Slot %d\n", ID, buffer[in], in); 
      exit(0); 
     } 

     /* Looks like we are OK */ 
     buffer[in] = nextProduced; 
     printf("Producer %d. Put %d in slot %d\n", ID, nextProduced, in); 
     in = (in + 1) % BUFFER_SIZE; 
     printf("incremented in!\n"); 

     sem_post(&mutex); 
     sem_post(&full); 
    } 
} 

void *consumer (void *Boo) { 
    static int nextConsumed = 0 ; 
    int *ptr ; 
    int ID ; 
    ptr = (int *) Boo ; 
    ID = *ptr ; 
    while (1) { 
     sem_wait(&full); 
     sem_wait(&mutex); 

     nextConsumed = buffer[out]; 
     /*Check to make sure we did not read from an empty slot*/ 
     if (nextConsumed == -1) { 
      printf("Synch Error: Consumer %d Just Read from empty slot %d\n", ID, out) ; 
      exit(0) ; 
     } 
     /* We must be OK */ 
     printf("Consumer %d Just consumed item %d from slot %d\n", ID, nextConsumed, out) ; 
     buffer[out] = -1 ; 
     out = (out + 1) % BUFFER_SIZE; 

     sem_post(&mutex); 
     sem_post(&empty); 
    } 
} 

输出继电器:

Producer ID = 0 created! 
Producer 0. Put 1 in slot 0 
Consumer ID = 1 created! 
incremented in! 
Consumer 1 Just consumed item 1 from slot 0 
Producer ID = 2 created! 
Producer 0. Put 2 in slot 1 
Synchronization Error: Producer 2 Just overwrote 2 from Slot 1 
Consumer 1 Just consumed item 2 from slot 1 
Consumer ID = 3 created! 
incremented in! 
Consumer 3 Just consumed item 2 from slot 1 
Synch Error: Consumer 1 Just Read from empty slot 2 
Producer 0. Put 4 in slot 2 

正如你所看到的,生产者0设法把2插槽1中。然而,生产者0之前可以增加in,制片人2次尝试将数据写入插槽1因为in未增加。

出于某种原因,似乎我的sem_waits()不起作用。有人可以帮我吗?

回答

0

我重写了你的代码在我的系统上运行,它似乎工作正常。

具体变化:因为我在OSX上,所以我需要从sem_init()更改为sem_open()sem_unlink();为了适应这种变化和一般的代码,我添加了一个interupt hander,这样打字^ C将使消费者和生产者完成并允许pthread_join()和任何下面的清理代码运行;你会将进程的数量与看起来独立的缓冲区数量相联系(请参阅你的ID和缓冲区初始化代码) - 我把它们分开了;在互斥体内移动nextProduced++;各种随机风格调整:

#include <stdio.h> 
#include <pthread.h> 
#include <semaphore.h> 
#include <stdlib.h> 
#include <signal.h> 

#define MAX_THREADS 5 
#define BUFFER_SIZE 10 

sem_t *empty, *full, *mutex; 

int buffer[BUFFER_SIZE]; 
int in = 0, out = 0; 

static volatile int keepRunning = 1; 

void intHandler(int dummy) { 
    keepRunning = 0; 
} 

void *producer(void * id_ptr) { 
    int ID = *((int *) id_ptr); 
    static int nextProduced = 0; 

    while (keepRunning) { 

     (void) sem_wait(empty); 
     (void) sem_wait(mutex); 

     /* Check to see if Overwriting unread slot */ 
     if (buffer[in] != -1) { 
      fprintf(stderr, "Synchronization Error: Producer %d Just overwrote %d from Slot %d\n", ID, buffer[in], in); 
      exit(1); 
     } 

     nextProduced++; // Producing Integers 

     /* Looks like we are OK */ 
     buffer[in] = nextProduced; 
     printf("Producer %d. Put %d in slot %d\n", ID, nextProduced, in); 
     in = (in + 1) % BUFFER_SIZE; 
     printf("incremented in!\n"); 

     (void) sem_post(mutex); 
     (void) sem_post(full); 
    } 

    return NULL; 
} 

void *consumer (void *id_ptr) { 
    int ID = *((int *) id_ptr); 
    static int nextConsumed = 0; 

    while (keepRunning) { 

     (void) sem_wait(full); 
     (void) sem_wait(mutex); 

     nextConsumed = buffer[out]; 

     /* Check to make sure we did not read from an empty slot */ 
     if (nextConsumed == -1) { 
      fprintf(stderr, "Synch Error: Consumer %d Just Read from empty slot %d\n", ID, out); 
      exit(1); 
     } 

     /* We must be OK */ 
     printf("Consumer %d Just consumed item %d from slot %d\n", ID, nextConsumed, out); 
     buffer[out] = -1; 
     out = (out + 1) % BUFFER_SIZE; 
     printf("incremented out!\n"); 

     (void) sem_post(mutex); 
     (void) sem_post(empty); 
    } 

    return NULL; 
} 

int main() { 
    int ID[MAX_THREADS]; 
    pthread_t TID[MAX_THREADS]; 

    empty = sem_open("/empty", O_CREAT, 0644, 10); 
    full = sem_open("/full", O_CREAT, 0644, 0); 
    mutex = sem_open("/mutex", O_CREAT, 0644, 1); 

    signal(SIGINT, intHandler); 

    for (int i = 0; i < MAX_THREADS; i++) { 
     ID[i] = i; 
    } 

    for (int i = 0; i < BUFFER_SIZE; i++) { 
     buffer[i] = -1; 
    } 

    pthread_create(&TID[0], NULL, producer, (void *) &ID[0]); 
    printf("Producer ID = %d created!\n", 0); 
    pthread_create(&TID[1], NULL, consumer, (void *) &ID[1]); 
    printf("Consumer ID = %d created!\n", 1); 

    pthread_create(&TID[2], NULL, producer, (void *) &ID[2]); 
    printf("Producer ID = %d created!\n", 2); 
    pthread_create(&TID[3], NULL, consumer, (void *) &ID[3]); 
    printf("Consumer ID = %d created!\n", 3); 

    for (int i = 0; i < 4; i++) { 
     pthread_join(TID[i], NULL); 
    } 

    (void) sem_unlink("/empty"); 
    (void) sem_unlink("/full"); 
    (void) sem_unlink("/mutex"); 

    return 0; 
} 

输出

> ./a.out 
Producer ID = 0 created! 
Producer 0. Put 1 in slot 0 
Consumer ID = 1 created! 
incremented in! 
Producer ID = 2 created! 
Producer 0. Put 2 in slot 1 
incremented in! 
Producer 2. Put 3 in slot 2 
incremented in! 
Consumer ID = 3 created! 
Producer 0. Put 4 in slot 3 
incremented in! 
Consumer 1 Just consumed item 1 from slot 0 
incremented out! 
Consumer 3 Just consumed item 2 from slot 1 
incremented out! 
Producer 2. Put 5 in slot 4 
incremented in! 
Producer 0. Put 6 in slot 5 
incremented in! 
Consumer 1 Just consumed item 3 from slot 2 
incremented out! 
Consumer 3 Just consumed item 4 from slot 3 
incremented out! 
Producer 2. Put 7 in slot 6 
incremented in! 
Producer 0. Put 8 in slot 7 
incremented in! 
Consumer 1 Just consumed item 5 from slot 4 
incremented out! 
Consumer 3 Just consumed item 6 from slot 5 
...