2016-09-27 142 views
0

我目前正在重构一些代码,我在nvidia硬件编码器中找到压缩视频图像。原来的问题是在这里:wondering if I can use stl smart pointers for this线程安全缓冲区阵列

基础上的答案,我已经更新了我的代码如下:

基础上的答案和意见,我试图让一个线程安全缓冲区数组。这里是。请给出意见。

#ifndef __BUFFER_ARRAY_H__ 
#define __BUFFER_ARRAY_H__ 

#include <vector> 
#include <mutex> 
#include <thread> 

template<class T> 
class BufferArray 
{ 
public: 
    class BufferArray() 
     :num_pending_items(0), pending_index(0), available_index(0) 
    {} 

    // This method is not thread-safe. 
    // Add an item to our buffer list 
    // Note we do not take ownership of the incoming pointer. 
    void add(T * buffer) 
    { 
     buffer_array.push_back(buffer); 
    } 

    // Returns a naked pointer to an available buffer. Should not be 
    // deleted by the caller. 
    T * get_available() 
    { 
     std::lock_guard<std::mutex> lock(buffer_array_mutex); 
     if (num_pending_items == buffer_array.size()) { 
      return NULL; 
     }  
     T * buffer = buffer_array[available_index]; 
     // Update the indexes. 
     available_index = (available_index + 1) % buffer_array.size(); 
     num_pending_items += 1; 
     return buffer; 
    } 

    T * get_pending() 
    { 
     std::lock_guard<std::mutex> lock(buffer_array_mutex); 
     if (num_pending_items == 0) { 
      return NULL; 
     } 

     T * buffer = buffer_array[pending_index]; 
     pending_index = (pending_index + 1) % buffer_array.size(); 
     num_pending_items -= 1; 
     return buffer; 
    } 


private: 
    std::vector<T * >     buffer_array; 
    std::mutex       buffer_array_mutex; 
    unsigned int      num_pending_items; 
    unsigned int      pending_index; 
    unsigned int      available_index; 

    // No copy semantics 
    BufferArray(const BufferArray &) = delete; 
    void operator=(const BufferArray &) = delete; 
}; 

#endif 

我的问题是我是否在这里打破了一些C++的良好做法建议?另外,我正在开发这个类,以便它可以被访问并使用我的多个线程。我想知道是否有什么我可能错过了。

+1

你'加()'不是线程安全的。它还需要一名锁守员。否则,这个问题太模糊,模糊。 –

+3

与您的问题无关,但在所有范围内保留以双下划线开头或以下划线后跟大写字母开头的符号。请参阅[这个旧的答案](http://stackoverflow.com/a/228797/440558)了解更多信息。 –

+0

@JoachimPileborg但我没有使用任何开头的单下划线或双下划线?你的意思是包括后卫? – Luca

回答

1

我想我会接近它是这样的:

在这个测试中,“处理”只是乘以一个int通过2.但是请注意处理器的线程是如何发生挂起的数据从一个等待队列,流程它然后将可用数据推送到可用队列。然后它(通过条件变量)表示消费者(在这种情况下,您的磁盘写入器)应该再次查看可用数据。

#include <vector> 
#include <mutex> 
#include <thread> 
#include <queue> 
#include <condition_variable> 
#include <iostream> 

namespace notstd { 
    template<class Mutex> auto getlock(Mutex& m) 
    { 
     return std::unique_lock<Mutex>(m); 
    } 
} 

template<class T> 
class ProcessQueue 
{ 
public: 
    ProcessQueue() 
    {} 

    // This method is not thread-safe. 
    // Add an item to our buffer list 
    // Note we do not take ownership of the incoming pointer. 
    // @pre start_processing shall not have been called 
    void add(T * buffer) 
    { 
     pending_.push(buffer); 
    } 

    void start_processing() 
    { 
     process_thread_ = std::thread([this] { 
      while(not this->pending_.empty()) 
      { 
       auto lock = notstd::getlock(this->mutex_); 
       auto buf = this->pending_.front(); 
       lock.unlock(); 

       // 
       // this is the part that processes the "buffer" 

       *buf *= 2; 

       // 
       // now notify the structure that the processing is done - buffer is available 
       // 

       lock.lock(); 
       this->pending_.pop(); 
       this->available_.push(buf); 
       lock.unlock(); 
       this->change_.notify_one(); 
      } 
     }); 
    } 

    T* wait_available() 
    { 
     auto lock = notstd::getlock(mutex_); 
     change_.wait(lock, [this] { return not this->available_.empty() or this->pending_.empty(); }); 
     if (not available_.empty()) 
     { 
      auto p = available_.front(); 
      available_.pop(); 
      return p; 
     } 

     lock.unlock(); 
     process_thread_.join(); 
     return nullptr; 
    } 

private: 
    std::queue<T * >     pending_; 
    std::queue<T * >     available_; 
    std::mutex       mutex_; 
    std::condition_variable    change_; 
    std::thread      process_thread_; 

    // No copy semantics - implicit because of the mutex 
}; 

int main() 
{ 
    ProcessQueue<int> pq; 

    std::vector<int> v = { 1, 2, 3, 4, 5, 6, 7, 8, 9 }; 
    for (auto& i : v) { 
     pq.add(std::addressof(i)); 
    } 

    pq.start_processing(); 

    while (auto p = pq.wait_available()) 
    { 
     std::cout << *p << '\n'; 
    } 
} 

预期输出:

2 
4 
6 
8 
10 
12 
14 
16 
18 
+0

谢谢。我从这段代码中学到很多东西! – Luca