2016-05-11 20 views
1

我想要做的是将整数推送到我的线程安全队列实现多线程,并与另一系列的线程同时弹出插入的数字。所有这些操作都必须是线程安全的,但我想要的另一个选项是队列的大小必须是固定的,就像缓冲区一样。如果缓冲区已满,所有推送线程必须等待弹出线程释放一些插槽。固定大小的线程安全队列

这是我的队列/缓冲区的实现,它似乎工作,但经过几次迭代后,它停止并保持阻塞,没有任何错误。

#include <queue> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 
#include <iostream> 

template <typename T> 

class Queue 
{ 
private: 
    std::queue<T> queue_; 
    std::mutex mutex_; 
    std::condition_variable cond_; 

public: 

    T pop() 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 

     cond_.wait(mlock, [this]{return !queue_.empty();}); 

     auto val = queue_.front(); 
     queue_.pop(); 
     return val; 
    } 

    void pop(T& item) 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 

     cond_.wait(mlock, [this]{return !queue_.empty();}); 

     item = queue_.front(); 
     queue_.pop(); 
    } 

    void push(const T& item, int buffer) 
    { 
     std::unique_lock<std::mutex> mlock(mutex_); 

     while (queue_.size() >= buffer) 
     { 
      cond_.wait(mlock); 
     } 

     queue_.push(item); 
     mlock.unlock(); 
     cond_.notify_one(); 
    } 

    Queue()=default; 
    Queue(const Queue&) = delete;   // disable copying 
    Queue& operator=(const Queue&) = delete; // disable assignment 

}; 

缓冲区的大小是在带有可变缓冲区的推送函数中定义的。这是使用的例子:

void prepare(Queue<int>& loaded, int buffer, int num_frames) 
{ 
    for (int i = 0; i < num_frames; i++) 
    { 
     cout<< "push "<<i<<endl; 
     loaded.push(i, buffer); 
    } 
} 

void load (vector<Frame>& movie, Queue<int>& loaded, int num_frames, 
          int num_points, int buffer, int height, int width) 
    { 
     for (int i = 0; i < num_frames; i++) 
     { 
      int num = loaded.pop(); 
      cout<< "pop "<<num<<endl; 
    } 
} 

int main() 
{ 
    srand(time(NULL)); 

    int num_threadsXstage = 4; 

    int width = 500; 
    int height = 500; 

    int num_points = width * height; 

    int num_frames = 100; 

    int frames_thread = num_frames/num_threadsXstage; 

    int preset = 3; 

    int buffer = 10; 

    //Vectors of threads 
    vector<thread> loader; 

    //Final vector 
    vector<Frame> movie; 
    movie.resize(num_frames); 

    //Working queues 
    Queue<int> loaded; 

    //Prepare loading queue task 
    thread preparator(prepare, ref(loaded), buffer, num_frames); 

    for (int i = 0; i < num_threadsXstage; i++) 
    { 
     //stage 1 
     loader.push_back(thread(&load, ref(movie), ref(loaded), frames_thread, 
           num_points, buffer, height, width)); 

    } 


    // JOIN 
    preparator.join(); 

    join_all(loader); 

    return 0; 
} 
+1

您在'pop()'中缺少一个'notify()'来发信号通知'push()'它现在很容易推送。另外,正如已经指出的那样,您需要在通知时保持锁定状态。 – TFM

回答

2

pop功能可以允许一个线程等待push做出向前进步,但他们不调用任何notify功能。任何时候您都可以调用适当的notify函数,以使条件变量上的线程被阻止,从而取得进展。

虽然解释原因相当复杂,但您应该拨打电话notify_all或拨打电话notify_one,同时保持锁定状态。理论上可以“唤醒错误的线程”,否则因为两个谓词使用相同的条件变量(队列不为空,队列未满)。

为了避免很难理解的故障模式,始终做这三两件事之一:

  1. 不要使用同一个条件变量来处理多个断言。例如,使用一个条件变量作为“不空”,另一个作为“不满”;
  2. 始终使用notify_all,从不notify_one;或
  3. 在持有互斥锁时始终调用通知函数。

只要你遵循这三个规则中的至少一个,你会避免在你醒来只选择了睡觉,你发布的互斥体后,同时留下一个能够处理的唯一线程的线程一个不起眼的故障模式状况仍然受阻。

+0

好的,谢谢,对不起,但我对多线程和条件变量很陌生,我不完全理解如何修改我的代码,以及哪三个更适合我的问题。 – CIVI89

+0

@ CIVI89最重要的是在弹出时调用通知函数。这将解决导致你提出这个问题的问题。至于要使用哪三种方法,其中的任何一种都可以在你的情况下正常工作。您可以使用两个条件变量,在锁定时总是发出信号,或者每次都调用notify_all。这三件事中的任何一件都可以解决你可能永远不会遇到的罕见的竞争条件。 ;) –

+0

如果您在未保持锁定的情况下指示条件变量,则在谓词检查和条件变量的等待操作之间发送信号时,信号可能会丢失。它独立于使用多个条件变量或者在notify_one之上使用'notify_all'。或者我错过了什么?我认为这是将等待操作与锁关联的关键。 – TFM