2017-10-12 54 views
0

我有一个制片人试图/消费者线程通知工作不为我的消费

生产者

#pragma once 
#ifndef PRODUCER_H 
#define PRODUCER_H 

#include <thread> 
#include "Mailbox.h" 

class Producer 
{ 
private: 
    std::thread producer; 
    Mailbox& mailbox; 
public: 
    Producer(Mailbox& newmailbox); 
    ~Producer(); 
    void start(); 
    void run(); 
}; 

Producer::Producer(Mailbox& newMailbox) : mailbox(newMailbox) {} 

Producer::~Producer() {} 

void Producer::start() 
{ 
    producer = std::thread(&Producer::run, this); 
} 

void Producer::run() 
{ 
    mailbox.inc(); 
} 

#endif 

消费者

#pragma once 
#ifndef CONSUMER_H 
#define CONSUMER_H 

#include "Mailbox.h" 
#include <thread> 
#include <iostream> 

class Consumer 
{ 
private: 
    Mailbox& mailbox; 
    std::thread consumer; 
public: 
    Consumer(Mailbox& newMailbox); 
    ~Consumer(); 
    void start(); 
    void run(); 
}; 

Consumer::Consumer(Mailbox& newMailbox) : mailbox(newMailbox) {} 


Consumer::~Consumer() {} 

void Consumer::start() 
{ 
    consumer = std::thread(&Consumer::run, this); 
} 

void Consumer::run() 
{ 
    mailbox.read(); 
} 

#endif 

邮箱

#pragma once 
#ifndef MAILBOX_H 
#define MAILBOX_H 

#include <mutex> 
#include <iostream> 

class Mailbox 
{ 
private: 
    int& mailbox; 
    int init_val; 
    std::mutex mmutex; 
    std::condition_variable condition; 
public: 
    Mailbox(); 
    ~Mailbox(); 
    void inc(); 
    void read(); 
}; 

Mailbox::Mailbox() : mailbox(init_val), init_val(0) {} 

Mailbox::~Mailbox() 
{ 

} 

void Mailbox::inc() 
{ 
    int count = 0; 
    while (count < 10) 
    { 

     std::unique_lock<std::mutex> lock(mmutex); 
     std::cout << "Producer increment\n"; 
     mailbox += 1; 
     lock.unlock(); 

     count += 1; 
    } 

} 

void Mailbox::read() 
{ 
    int count = 0; 
    while (count < 10) 
    { 

     std::unique_lock<std::mutex> lock(mmutex); 

     condition.wait(lock, [this](){return get_cflag(); }); 

     condition.notify_one(); 

     count += 1; 

    } 

} 

#endif 

主要

int main() 
{ 

    Mailbox* mailbox = new Mailbox(); 
    Consumer* consumer = new Consumer(*mailbox); 
    Producer* producer = new Producer(*mailbox); 

    consumer->start(); 
    producer->start(); 

    return 0; 
} 

互斥锁尽管异步的作品,因为我有过当std::thread将开始,所以我决定实现除使用std::unique_lockstd::mutex半同步方法无法控制的。

问题是,消费者等待并且生产者在未提示的情况下飞行,至少这是调试器告诉我的情况,以及最后一个生产者迭代结果是否中止(),因此出现问题。

+0

无处做任何你的线程没有检查他们是否需要等待什么。具体来说,你的'read'函数调用'wait'而不检查是否需要等待,然后在'wait'返回之后继续执行,而不检查是否需要等待。条件变量是无状态的 - 在决定调用“wait”之前以及在决定继续之前,检查是否需要等待两者。 –

+0

@DavidSchwartz给出您的评论,我做了一些更多的研究,并阅读了“C++ Concurrency In Action(安东尼威廉姆斯)一个平易近人的解决方案。 – Mushy

+0

@WorldSEnder我将int邮箱引用更正为int&mailbox和我尚未完成的副本分配。 – Mushy

回答

0

我不是C++的人,但是如果这些条件变量按照我认为他们的方式工作,只有在信号到达时才会收到通知而您正在等待。如果信号到达之前您开始等待,您将无限期阻止。

当您获取'Mailbox :: read`中的锁定后,您应该检查项目是否可用,并且只有在条件变量上等待,如果不是之一。如果有,继续前进,把它:

int Mailbox::read() 
{ 
    std::unique_lock<std::mutex> lock(m); 
    while (mailbox <= 0) 
     condition.wait(lock); 
    return mailbox--; 
} 
+0

即使他正在等待的东西已经发生,他的'read'函数会调用'wait'。当等待返回时,它假设他正在等待的事情已经发生。你*必须*检查 - 条件变量不知道你正在等待的东西是否已经发生。跟踪它是你的工作。 –

+0

我改变了生产者和消费者函数,使用一个简单的标志来表示等待lambda表达式的信号,以表示准备或等待。谢谢迈克和大卫。 – Mushy

0

基于戴维·施瓦茨的评论,从迈克·斯特罗贝尔,以及额外的研究见解,我改变了生产者和消费者功能

生产者

void Mailbox::inc() 
{ 
    int count = 0; 
    while (count < 10) 
    { 

     std::unique_lock<std::mutex> lock(mmutex); 
     std::cout << "Producer increment\n"; 
     mailbox += 1; 
     lock.unlock(); 
     set_cflag(true); // signal to the consumer data is ready 
     condition.notify_one(); 

     { 
      std::unique_lock<std::mutex> lock(mmutex); 
      condition.wait(lock, [this]() {return get_pflag(); }); 
     } 

     set_pflag(false); 

     count += 1; 
    } 

} 

消费者

void Mailbox::read() 
{ 
    int count = 0; 
    while (count < 10) 
    { 

     std::unique_lock<std::mutex> lock(mmutex); 

     condition.wait(lock, [this](){return get_cflag(); }); 

     std::cout << "Consumer: " << mailbox << "\n"; 
     lock.unlock(); 

     set_pflag(true); 
     condition.notify_one(); 

     count += 1; 

     set_cflag(false); 

    } 

} 

邮箱

class Mailbox 
{ 
private: 
    int& mailbox; 
    int cflag, pflag; 
    int init_val; 
    std::mutex mmutex; 
    std::condition_variable condition; 
public: 
    Mailbox(); 
    ~Mailbox(); 
    int get_cflag() { return cflag; } 
    void set_cflag(int newFlag) { cflag = newFlag; } 
    int get_pflag() { return pflag; } 
    void set_pflag(int newFlag) { pflag = newFlag; } 
    void inc(); 
    void read(); 
}; 

Mailbox::Mailbox() : mailbox(init_val), init_val(0), cflag(0), pflag(0) {} 

Mailbox::~Mailbox() 
{ 

} 

在执行时的输出是作为所期望我

int main() 
{ 

    Mailbox* mailbox = new Mailbox(); 
    Consumer* consumer = new Consumer(*mailbox); 
    Producer* producer = new Producer(*mailbox); 

    consumer->start(); 
    producer->start(); 

    fgetc(stdin); 

    return 0; 
} 

生产者增量

消费者:1

生产者增量

消费者:2

生产者增量

消费者:3

生产者增量

消费者:4

生产者增量

消费者:5

生产者增量

消费者:6

生产者增量

消费者:7

生产者增量

消费者:8

生产者增量

消费者:9

生产者增量

消费:10

+0

我不能调用'std :: thread.join()'作为'std :: thread.terminate()',可能已经调用类线程退出函数并且程序终止了 – Mushy

+0

我不清楚为什么生产者和消费者线程自己加入。除了无操作或死锁之外,还有什么影响?通常,_spawns_工作线程执行加入的线程。 –