2013-07-02 18 views
1

嗨快速问题给予以下c + + 11代码,它适用于生产者/消费者,罚款是我想关闭DataQueue并停止所有消费者。虽然问题在于问题在于消费者只是称为popWait()并且可以被阻止。在这种情况下,如何关闭我的客户?这可能是一个需要纠正的设计问题。我试图不引起任何性能命中,因为此代码应理想地使用干扰模式或类似的方式使队列锁定为空闲状态。因为它可能是我想知道是否有简单的东西让消费者知道在生产者关闭时停止调用pop等待功能。如果队列中还有数据需要等待消费者完成数据撤销,那么棘手的部分就是关闭。我相信我有一个解决方案,消费者可以自行关闭,但可以接受想法。提前致谢。生产者消费者线程与关机c + +

#ifndef __DataQueue_h__ 
#define __DataQueue_h__ 

#include <mutex> 
#include <queue> 
#include <condition_variable> 
#include <chrono> 

template <typename DataT> 
class DataQueue 
{ 
    public: 

    DataQueue(): _shutdown(false), _waitTime(5), _itemAvailable() {} 

    void push (const DataT& data)  
    { 
    std::unique_lock<std::mutex> lock(_mutex); 
    queue.push(data); 
    _itemAvailable.notify_one(); 
    } 

// worked fine until I need to shutdown services... then some were blocked 
    DataT popWait() 
    { 
    std::unique_lock<std::mutex> lock(_mutex); 

    if(queue.empty()) 
    { 
     _itemAvailable.wait(lock); 
    } 

    DataT temp(queue.front()); 
    queue.pop(); 

    return temp; 
    } 

    inline void shutdown() 
    { 
    _shutdown = true; 
    } 

    private: 
    std::queue<DataT> queue; 
    bool _shutdown; 
    unsigned int _waitTime; 
    std::mutex _mutex; 
    std::condition_variable _itemAvailable; 

}; 

#endif 
+1

等待等待通知... notify_one通知一个... notify_all通知所有服务员... – kfsone

回答

1

一个想法是唤醒所有消费者在致电shutdown 。在popWait方法中,您可以检查当您从wait返回时是否设置了关闭标志。从所有的同步和信令东西popWait

除此之外的

#include <atomic> 
#include <mutex> 
#include <queue> 
#include <condition_variable> 
#include <chrono> 

template <typename DataT> 
class DataQueue 
{ 
public: 
    DataQueue(): _shutdown(false), _itemAvailable() {} 

    void push (const DataT& data)  
    { 
     std::unique_lock<std::mutex> lock(_mutex); 
     queue.push(data); 
     _itemAvailable.notify_one(); 
    } 

    Maybe<DataT> popWait() 
    { 
     std::unique_lock<std::mutex> lock(_mutex); 

     while(queue.empty() && !_shutdown) 
     { 
      _itemAvailable.wait(lock); 
     } 

     Maybe<DataT> data; 
     // leave pending data in the queue 
     if (_shutdown) 
     { 
      // consumers should stop polling when receiving an 'empty' value 
      return data; 
     } 

     data.add(queue.front()); 
     queue.pop(); 
     return data; 
    } 

    inline void shutdown() 
    { 
     _shutdown = true; 
     _itemAvailable.notify_all(); 
    } 

private: 
    std::queue<DataT> queue; 
    std::atomic<bool> _shutdown; 
    std::mutex _mutex; 
    std::condition_variable _itemAvailable; 
}; 

返回值,你也不得不重新考虑的popWait返回值。如果要实施通用的shutdown()方法,即不将特殊哨兵值填入队列本身,则popWait必须能够返回指示生产者已停止的“值” - 可能是类似templaty的东西,如Maybe<DataT> 。我想Maybe<DataT>可能会返回DataT或什么也没有,在这种情况下,消费者会停止投票。

template<typename DataT> 
class Maybe 
{ 
    DataT _data; 
    bool _empty; 

pulic: 
    Maybe() : _data(), _empty(true) {}; 

    void add(const DataT& raData) 
    { 
     _data=raData; 
     _empty=false; 
    } 

    bool isEmpty() const 
    { 
     return _empty; 
    } 

    DataT get() const 
    { 
     return _data; 
    } 
} 

这是一个相当原始的接口。你可以根据需要扩展它。

ComicSansMS向我指出,我应该声明_shutdown成员变量为std::atmic<bool>避免存储器记录的问题。感谢您的提醒。

我刚刚偶然发现了std::optional<T>(C++ 14的新增功能),这基本上是我的想法。

+1

在这种情况下'_shutdown'应该是'atomic ',否则你将冒着令人讨厌的内存排序错误。 – ComicSansMS

+0

感谢这是一个好主意。 – bjackfly

0

popWait,你可以检查队列为空或_shutDown是真实的。在后一种情况下,你通知呼叫者他不应再询问数据(抛出一个例外(我认为不是最优),或重新设计方法签名)。然后,您所要做的就是在shutdown()方法中拨打notifyAll()

1

您可以将“毒丸”对象推入队列。事实上,推动与您想要关闭的消费者数量一样多。这样,您也将摆脱关闭成员变量来检查卸载检查到接收线程,这是增加整体排队性能。

这是我为我的线程池实现做的方法,它完美地工作。

+0

你如何确定消费者数量而不引入竞赛?如果您事先知道消费者的数量(就像您为线程池所做的那样),但这种方法没有问题,但是无法正常工作。在线程检测到关闭状态后,您可以尝试重新推入毒丸,但这也很容易导致细微的比赛。全局关闭标志肯定更容易取消。 – ComicSansMS

+0

您可以推送另一个毒丸对象进行关机。在这种情况下,您推入一个物体,接收机在关闭之前将其推回。请注意,无论如何,只要您不知道消费者的确切数量,如果所有消费者都关闭,将很难发现。 – ogni42