2017-09-07 26 views
2

我的代码获取图像并对其进行处理。性能对我的代码至关重要,所以我尝试了多线程。目前,我只是把收购部分作为一个独立的线程。我正在使用std::queue实现一个简单的FIFO缓冲区,用于存储获取的图像。采集功能AcquireImages无限期地将原始图像数据写入该缓冲区,直到用户中断。处理函数,ProcessImages读取缓冲区并处理图像数据(目前在主线程中,但我计划将其作为单独的线程以及一旦我解决了问题)。这里是我的代码(改性形成MCV example):如何读取与另一个线程共享的std :: queue?

#include <iostream> 
#include <vector> 
#include <queue> 
#include <atomic> 
#include <thread> 

#define NUM_CAMERAS 2 

void AcquireImages(std::queue<unsigned char*> &rawImageQueue, std::atomic<bool> &quit) 
{ 
    unsigned char* rawImage{}; 

    while (!quit) 
    { 
     for (int camera = 0; camera < NUM_CAMERAS; camera++) 
     { 
      switch (camera) 
      { 
      case 0: 
       rawImage = (unsigned char*)"Cam0Image"; 
       break; 
      case 1: 
       rawImage = (unsigned char*)"Cam1Image"; 
       break; 
      default: 
       break; 
      } 

      rawImageQueue.push(std::move(rawImage)); 
     } 
    } 
} 

int ProcessImages(const std::vector<unsigned char*> &rawImageVec, const int count) 
{ 
    // Do something to the raw image vector 

    if (count > 10) 
    { 
     return 1; 
    } 
    else 
    { 
     return 0; 
    } // In my application, this function only returns non-zero upon user interception. 
} 


int main() 
{ 
    // Preparation 
    std::vector<unsigned char*> rawImageVec; 
    rawImageVec.reserve(NUM_CAMERAS); 
    std::queue<unsigned char*> rawImageQueue; 
    int count{}; 

    const unsigned int nThreads = 1; // this might grow later 

    std::atomic<bool> loopFlags[nThreads]; 
    std::thread  threads[nThreads]; 

    // Start threads 
    for (int i = 0; i < nThreads; i++) { 
     loopFlags[i] = false; 
     threads[i] = std::thread(AcquireImages, rawImageQueue, ref(loopFlags[i])); 
    } 

    // Process images 
    while (true) 
    { 

     // Process the images 
     for (int cam{}; cam < NUM_CAMERAS; ++cam) 
     { 
      rawImageVec.push_back(rawImageQueue.front()); 
      rawImageQueue.pop(); 
     } 

     int processResult = ProcessImages(move(rawImageVec), count); 
     if (processResult) 
     { 
      std::cout << "Leaving while loop.\n"; // In my application this is triggered by the user 
      break; 
     } 

     rawImageVec.clear(); 
     ++count; 
    } 

    // Shutdown other threads 
    for (auto & flag : loopFlags) { 
     flag = true; 
    } 

    // Wait for threads to actually finish. 
    for (auto& thread : threads) { 
     thread.join(); 
    } 

    return 0; 
} 

你们有些人可能已经注意到我的失误。我所知道的是,这个程序在rawImageVec.push_back(rawImageQueue.front());处抛出异常。

抛出异常后输出内容如下:

Debug Assertion Failed! 

Program: C:\WINDOWS\SYSTEM32\MSVCP140D.dll 
File: c:\program files (x86)\microsoft visual studio 14.0\vc\include\deque 
Line: 329 

Expression: deque iterator not dereferencable 

我认识这个问题的原因可能是,我正在读与另一个线程共享的东西(对吗?)。我该如何解决这个问题?

我跟着Praetorian的建议在评论后,检查rawImageQueue是否为空,我发现它总是空的。我不确定是什么原因造成的。

+3

用互斥体或类似的方法保护队列。 –

+1

在调用'rawImageQueue.front()'之前,您没有检查队列是否为空。并且从任一线程进入队列的所有访问必须受互斥体保护。 – Praetorian

+1

阅读有关“生产者消费者模式”(例如https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem),解决当前问题后首先要考虑的问题 - 当生产者生产速度更快时会发生什么比消费者消费 –

回答

2

以下是共享队列上生产者/消费者的一般示例。这个想法是,如果你正在从一个数据结构中读写,你需要对访问进行一些保护。

为此,下面的示例使用条件变量和互斥锁。

#include <thread> 
#include <iostream> 
#include <chrono> 
#include <queue> 
#include <mutex> 
#include <vector> 
#include <condition_variable> 

using namespace std::chrono_literals; 
using std::vector; 
using std::thread; 
using std::unique_lock; 
using std::mutex; 
using std::condition_variable; 
using std::queue; 

class WorkQueue 
{ 
    condition_variable work_available; 
    mutex work_mutex; 
    queue<int> work; 

public: 
    void push_work(int item) 
    { 
    unique_lock<mutex> lock(work_mutex); 

    bool was_empty = work.empty(); 
    work.push(item); 

    lock.unlock(); 

    if (was_empty) 
    { 
     work_available.notify_one(); 
    }  
    } 

    int wait_and_pop() 
    { 
    unique_lock<mutex> lock(work_mutex); 
    while (work.empty()) 
    { 
     work_available.wait(lock); 
    } 

    int tmp = work.front(); 
    work.pop(); 
    return tmp; 
    } 
}; 

int main() { 
    WorkQueue work_queue; 

    auto producer = [&]() { 
    while (true) { 
     work_queue.push_work(10); 
     std::this_thread::sleep_for(2ms); 
    } 
    }; 

    vector<thread> producers; 
    producers.push_back(std::thread(producer)); 
    producers.push_back(std::thread(producer)); 
    producers.push_back(std::thread(producer)); 
    producers.push_back(std::thread(producer)); 

    std::thread consumer([&]() {   
    while (true) 
    { 
     int work_to_do = work_queue.wait_and_pop(); 
     std::cout << "Got some work: " << work_to_do << std::endl; 
    } 
    }); 

    std::for_each(producers.begin(), producers.end(), [](thread &p) { 
    p.join(); 
    });  

    consumer.join(); 
} 
+1

这应该解决OP对空队列的特殊问题。虽然值得一提的是你的队列的大小是无限的,但如果工作产生得比消耗的速度快,它将会耗尽内存。 –

+1

@Gruffalo这是一个很好的观点。在控制源代码的最简单的实现中,您可以向容器类添加队列大小,并且可以推送阻止生产者。尽管如此,我认为处理背压依赖于实施细节。如果数据源在特定程序之外(例如任何类型的分布式系统),则需要有一个退避协议,如tcp窗口。 – Josh

+0

您可能会失去'wait_and_pop'的'notify_one'触发器,因为您在'unlock' – curiousguy12

1

你的情况相对简单,因为你似乎只有一个生产者和一个消费者。此外,图像处理听起来很慢(足够慢,不用担心线程争用),并且您正在从单线程版本切换,因此可能不需要打扰高效的无锁实现。

我建议研究这个伪代码:https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem#Using_monitors,然后在需要时了解条件变量:http://en.cppreference.com/w/cpp/thread/condition_variable

+0

链接无疑是有帮助的,尽管我需要更多的时间来使用我的代码才能真正实现它的功能。我也发现'rawImageQueue'总是空的。 – db7638

+0

我希望你喜欢做得好,避免重新发明,因为多线程可能会很棘手。 –

+0

这就是意图。我不认为我正在重新编写我的代码的任何部分。我仍然不明白为什么'rawImageQueue'是空的。 – db7638

相关问题