2016-07-03 38 views
2

我试图与多生产者和消费者做一个代码。我为生产者和消费者创建了多线程,并使用信号量进行同步。代码与单个生产者和消费者一起工作正常。多生产者 - 消费者执行的效率

我面对的问题是,在程序执行一段时间后,只有consumer1和producer1参与了这个过程。我无法理解其他生产者和消费者发生了什么事。

我还想知道如何让多生产者 - 消费者问题高效?所有的生产者和消费者分别得到平等的生产和消费机会的意义上的有效性? C++代码(它包括了很多的C):

#include <iostream> 
#include <pthread.h> 
#include <semaphore.h> 
#include <unistd.h> 
#include <queue> 
using namespace std; 
sem_t empty; 
sem_t full; 
int cnt = 0; 
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
queue<int> q; 
void *producer(void *a) 
{ 
    int *num = (int *)a; 
    while(1) { 
     sem_wait(&empty); 
     pthread_mutex_lock(&mutex); 
     cnt = cnt+1; 
     q.push(cnt); 
     cout<<cnt<<" item produced by producer "<<(*num+1)<<endl; 
     pthread_mutex_unlock(&mutex); 
     sem_post(&full); 
     sleep(1); 
    } 
} 
void *consumer(void *a) 
{ 
    int *num = (int *)a; 
    while(1) { 
     sem_wait(&full); 
     pthread_mutex_lock(&mutex); 
     cout<<q.front()<<" item consumed by consumer "<<(*num+1)<<endl; 
     q.pop(); 
     pthread_mutex_unlock(&mutex); 
     sem_post(&empty); 
     sleep(1); 
    } 
} 
int main() 
{ 
    pthread_t p[5]; 
    pthread_t c[5]; 
    sem_init(&empty,0,5); 
    sem_init(&full,0,0); 
    int i; 
    for(i = 0; i < 5; i++) { 
     pthread_create(&p[i],NULL,producer,(void *)(&i)); 
    } 
    for(i = 0; i < 5; i++) { 
     pthread_create(&c[i],NULL,consumer,(void *)(&i)); 
    } 
    for(i = 0; i < 5; i++) { 
     pthread_join(p[i],NULL); 
     pthread_join(c[i],NULL); 
    } 
} 

更新代码:

#include <iostream> 
#include <pthread.h> 
#include <semaphore.h> 
#include <unistd.h> 
#include <queue> 
#include <map> 
using namespace std; 
sem_t empty; 
sem_t full; 
int cnt = 0; 
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
map<pthread_t,int> mc,mp; 
queue<int> q; 
void *producer(void *a) 
{ 
    while(1) { 
     sem_wait(&empty); 
     pthread_mutex_lock(&mutex); 
     cnt = cnt+1; 
     q.push(cnt); 
     cout<<cnt<<" item produced by producer "<<mp[pthread_self()]<<endl; 
     pthread_mutex_unlock(&mutex); 
     sem_post(&full); 
     sleep(1); 
    } 
} 
void *consumer(void *a) 
{ 
    while(1) { 
     sem_wait(&full); 
     pthread_mutex_lock(&mutex); 
     cout<<q.front()<<" item consumed by consumer "<<mc[pthread_self()]<<endl; 
     q.pop(); 
     pthread_mutex_unlock(&mutex); 
     sem_post(&empty); 
     sleep(1); 
    } 
} 
int main() 
{ 
    pthread_t p[5]; 
    pthread_t c[5]; 
    sem_init(&empty,0,5); 
    sem_init(&full,0,0); 
    int i; 
    pthread_mutex_lock(&mutex); 
    for(i = 0; i < 5; i++) { 
     pthread_create(&p[i],NULL,producer,NULL); 
     pthread_create(&c[i],NULL,consumer,NULL); 
     mc[c[i]] = i+1; 
     mp[p[i]] = i+1; 
    } 
    pthread_mutex_unlock(&mutex); 
    for(i = 0; i < 5; i++) { 
     pthread_join(p[i],NULL); 
     pthread_join(c[i],NULL); 
    } 
} 
+0

看看您传递给pthread_create的参数以及您如何在线程函数中使用该信息。考虑是否存在竞争条件。考虑传递'i'与传递'&i'的影响。 –

+0

任何理由使用操作系统特定的'pthread'而不是标准的'std :: thread'? – Christophe

+0

@Christophe:因为我没有在C++中学过线程,所以我使用了c版本。 –

回答

2

简短的回答

的线程并在事实上执行与平等的机会,但他们只是打印出一个不是他们的标识符。

详细解释

您保持在每个线程的指针num到线程数。这是指向保存的值的指针,而不是值本身。所以所有的线程指向同一个计数器,想要找到他们自己的标识符。

每次访问*num时,您都可以访问的不是i在启动线程时的值,而是其当前值。

不幸的是,在每个循环的main()中,都重用了变量i。所以最后一个循环,你会将i设置回0,并等待第一个线程加入。但是所有这些线程都会永久循环,所以循环很难有机会超出这个初始值。因此,每个线程都认为这是数字*num+1,此时为1。

请注意,您在注释中指出某人创建竞争条件的方式:所有消费者和生产者线程取消引用指针,访问受互斥锁保护区域中的相同变量。还行吧。但是,当他们正在读取变量时,主线程仍然很乐意将共享变量更改为任何锁之外。这绝对是一场种族风险。

解决方法

std::thread将允许您通过walue通过i,让每个线程拥有的是ID自己的不变副本。

使用pthreads,您必须将指针传递给值。不幸的是,即使你在线程开始的时候做了一个值指向的本地拷贝,你仍然处于竞争状态。

一个快速的解决方案,以观察哪个线程真的在做这项工作将打印输出结果pthread_self()(见here如何做)。或者将id存储在int数组中,并将每个线程的地址传递给该数组中的唯一元素。

+0

thread_join如何改变初始值?我不明白吗? –

+0

@ShivamMitra thread_join并没有改变任何东西,但是你在一个以'for(i = 0;'和不幸的开头的循环开始,它和我指向的所有num指针都是一样的。 – Christophe

+0

任何关于如何打印哪个线程已经产生或消耗了? –