2014-10-11 106 views
0

我的代码只用于一个生产者 - 一个消费者的情况。pthread_cond_wait fifo循环队列中的死锁

这里是我的测试代码:

static void *afunc(void * arg) { 
    Queue* q = arg; 
    for(int i= 0; i< 100000; i++) { 
     *queue_pull(q) = i; //get one element space 
     queue_push(q);  //increase the write pointer 
    } 
    return NULL; 
} 
static void *bfunc(void * arg) { 
    Queue* q = arg; 
    for(;;) { 
     int *i = queue_fetch(q); //get the first element in queue 
     printf("%d\n", *i); 
     queue_pop(q); //increase the read pointer 
    } 
} 
int main() { 
    Queue queue; 
    pthread_t a, b; 
    queue_init(&queue); 
    pthread_create(&a, NULL, afunc, &queue); 
    pthread_create(&b, NULL, bfunc, &queue); 

    sleep(100000); 
    return 0; 
} 

,这里是循环队列

#define MAX_QUEUE_SIZE 3 
typedef struct Queue{ 
    int data[MAX_QUEUE_SIZE] ; 
    int read,write; 
    pthread_mutex_t mutex, mutex2; 
    pthread_cond_t not_empty, not_full; 
}Queue; 
int queue_init(Queue *queue) { 
    memset(queue, 0, sizeof(Queue)); 
    pthread_mutex_init(&queue->mutex, NULL); 
    pthread_cond_init(&queue->not_empty, NULL); 
    pthread_mutex_init(&queue->mutex2, NULL); 
    pthread_cond_init(&queue->not_full, NULL); 
    return 0; 
} 
int* queue_fetch(Queue *queue) { 
    int* ret; 
    if (queue->read == queue->write) { 
     pthread_mutex_lock(&queue->mutex); 
     pthread_cond_wait(&queue->not_empty, &queue->mutex); 
     pthread_mutex_unlock(&queue->mutex); 
    } 
    ret = &(queue->data[queue->read]); 
    return ret; 
} 
void queue_pop(Queue *queue) { 
    nx_atomic_set(queue->read, (queue->read+1)%MAX_QUEUE_SIZE); 
    pthread_cond_signal(&queue->not_full); 
} 
int* queue_pull(Queue *queue) { 
    int* ret; 
    if ((queue->write+1)%MAX_QUEUE_SIZE == queue->read) { 
     pthread_mutex_lock(&queue->mutex2); 
     pthread_cond_wait(&queue->not_full, &queue->mutex2); 
     pthread_mutex_unlock(&queue->mutex2); 
    } 
    ret = &(queue->data[queue->write]); 
    return ret; 
} 
void queue_push(Queue *queue) { 
     nx_atomic_set(queue->write, (queue->write+1)%MAX_QUEUE_SIZE); 
     pthread_cond_signal(&queue->not_empty); 
} 

后片刻的实施,似乎两个子线程会变成僵局..

编辑:我使用两个信号量,但它也有一些问题..它很漂亮 奇怪,如果只是执行./main,它似乎很好,但如果我重定向到一个文件,如./main> A.TXT,然后WC -l A.TXT,结果不等于所述排队号码..

int queue_init(Queue *queue) { 
    memset(queue, 0, sizeof(Queue)); 
    pthread_mutex_init(&queue->mutex, NULL); 
    sem_unlink("/not_empty"); 
    queue->not_empty = sem_open("/not_empty", O_CREAT, 644, 0); 
    sem_unlink("/not_full"); 
    queue->not_full = sem_open("/not_full", O_CREAT, 644, MAX_QUEUE_SIZE); 
    return 0; 
} 

int* queue_fetch(Queue *queue) { 
    sem_wait(queue->not_empty); 
    return &(queue->data[queue->read]); 
} 
void queue_pop(Queue *queue) { 
    nx_atomic_set(queue->read, (queue->read+1)%MAX_QUEUE_SIZE); 
    sem_post(queue->not_full); 
} 

int* queue_pull(Queue *queue) { 
    sem_wait(queue->not_full); 
    return &(queue->data[queue->write]); 
} 
void queue_push(Queue *queue) { 
    nx_atomic_set(queue->write, (queue->write+1)%MAX_QUEUE_SIZE); 
    sem_post(queue->not_empty); 
} 
+0

pthreads条件不是标志......它们不保留“设置”。如果您没有等待*条件发出信号*,您将错过信号。 – Dmitri 2014-10-11 05:46:45

+0

更好的解决方案吗?一个sem_t是不够的 – user3682618 2014-10-11 06:03:11

+0

如果你打算使用信号量,一个sem_t和一个互斥对于一个无界队列就足够了(对任何数量的生产者/消费者来说都是安全的)。如果你需要一个有界的队列,你需要另一个sem_t。 – 2014-10-11 13:21:03

回答

1

很有可能你的线程中的一个等待以信号的条件信号发生后,导致两个线程无限期地等待对方。

Pthreads条件变量不保持发信号 - 信号是一个瞬间动作。条件变量不用来决定是否等待 - 它只是用来唤醒已经在等待的线程;您需要使用不同的方法来确定是否等待,如检查标志或某种测试条件。

通常情况下,你的信号如下:

  1. 锁定互斥
  2. 做你的更新,一般留下您的测试条件“真”(例如,设置你的标志)
  3. 呼叫pthread_cond_signal()pthread_cond_broadcast()
  4. 解锁互斥

...并等待如下:

  1. 锁定互斥
  2. 循环,直到你的测试表达式是“真”(例如:直到您的标志被设置),仅当测试为假时才呼叫pthread_cond_wait()(在循环内)。
  3. 循环结束后,当您的测试成功时,请执行您的工作。
  4. 解锁互斥

例如,信号可能会去是这样的:

pthread_mutex_lock(&mtx);  /* 1: lock mutex */ 
    do_something_important(); /* 2: do your work... */ 
    ready_flag = 1;    /* ...and set the flag */ 
    pthread_cond_signal(&cond); /* 3: signal the condition (before unlocking) */ 
pthread_mutex_unlock(&mtx); /* 4: unlock mutex */ 

,并等待可能是这样的:

pthread_mutex_lock(&mtx);   /* 1: lock mutex */ 
    while (ready_flag == 0)   /* 2: Loop until flag is set... */ 
    pthread_cond_wait(&cond, &mtx); /* ...waiting when it isn't */ 
    do_something_else();    /* 3: Do your work... */ 
    ready_flag = 0;      /* ...and clear the flag if it's all done */ 
pthread_mutex_unlock(&mtx);   /* 4: unlock mutex */ 

服务员不会错过这样的条件,因为互斥锁确保服务员的测试和等待以及信号员的设置和信号不能同时发生。


queue_fetch()功能的这一部分:

if (queue->read == queue->write) { 
    pthread_mutex_lock(&queue->mutex); 
    pthread_cond_wait(&queue->not_empty, &queue->mutex); 
    pthread_mutex_unlock(&queue->mutex); 
} 
ret = &(queue->data[queue->read]); 

..might被改写为:

pthread_mutex_lock(&queue->mutex); 
    while (queue->read == queue->write) 
     pthread_cond_wait(&queue->not_empty, &queue->mutex); 
    ret = &(queue->data[queue->read]); 
pthread_mutex_unlock(&queue->mutex); 

...其中:

  1. 上锁/解锁互斥体的方法是在附近移动,所以直到条件等待开始
  2. if万一条件等待过早中断变更为while互斥体,而测试表达式,并仍持有举行
  3. queue->readqueue->write访问完成与互斥锁举行

类似的更改将作出queue_pull()

对于信令码,的queue_pop()以下部分:

nx_atomic_set(queue->read, (queue->read+1)%MAX_QUEUE_SIZE); 
pthread_cond_signal(&queue->not_full); 

..might改为:

pthread_mutex_lock(&queue->mutex); 
    queue->read = (queue->read + 1) % MAX_QUEUE_SIZE; 
    pthread_cond_signal(&queue->not_full); 
pthread_mutex_unlock(&queue->mutex); 

..where:

  1. 互斥是在发信号通知情况下保持(这确保了在决定是否等待的服务员之间不能发出信号)真正开始等待,因为服务员将持有该间隔期间互斥)
  2. 互斥举行而改变queue->read以及而非因为信号的条件时,无论如何

类似的变化需要互斥使用nx_atomic_set()将作出queue_push()


此外,(访问readwrite时,使相同的互斥总是保持),你应该只使用一个单一的互斥体,一旦while循环被添加到条件等待几乎没有令人信服的理由使用更多比一个条件变量。如果切换到一个条件变量,刚刚完成的等待后再次信号的条件:

pthread_mutex_lock(&queue->mutex); 
    while (queue->read == queue->write) { 
     pthread_cond_wait(&queue->cond, &queue->mutex); 
     pthread_cond_signal(&queue->cond); /* <-- signal next waiter, if any */ 
    } 
    ret = &(queue->data[queue->read]); 
pthread_mutex_unlock(&queue->mutex); 
+0

你的回答是对的,但是你有什么建议? – user3682618 2014-10-11 07:30:07

1

你操纵的互斥体外面的队列的状态,这是天生racey。

我会建议使用单个互斥量,但每当您更改或测试读取&写入标记时都会采用它。这也意味着你不需要原子集。

+0

单个互斥体对此不够,因为这里将存储器函数(fetch&pull)和索引函数(pop&push)分开,在将内存从队列中拉出后,我会对存储器做一些耗时的计算,比增加索引 – user3682618 2014-10-11 07:07:57

+0

从队列中获取元素后,不需要锁定互斥锁。只有在测试或更改读取和写入标记时才需要保持互斥锁。除非你打算互斥体覆盖队列元素的*内容* – Rich 2014-10-11 14:55:12