2012-12-29 35 views
0

我的程序生产者线程从文本文件(具有大约8000行文本)中读取文本行并将行加载到 并发队列中。单一生产者 - 多个消费者:如何告诉消费者生产已完成

三个使用者线程从队列中读取每行写入单独文件的行。

当我运行程序只有生产者线程和只有一个消费者线程完成。其他两个线程 似乎挂起。

如何可靠地告诉所有消费者线程已达到文件末尾,因此应返回 但确保队列完全为空。

我的平台是Windows 7 64位

VC11。

编译为64位和32位的代码得到相同的行为。

这是代码。 (这是自包含的,编译)

#include <queue> 
#include<iostream> 
#include<fstream> 
#include <atomic> 
#include <thread> 
#include <condition_variable> 
#include <mutex> 
#include<string> 
#include<memory> 


template<typename Data> 
class concurrent_queue 
{ 
private: 
    std::queue<Data> the_queue; 
    mutable std::mutex the_mutex; 
    std::condition_variable the_condition_variable; 
public: 
    void push(Data const& data){ 
     { 
      std::lock_guard<std::mutex> lock(the_mutex); 
      the_queue.push(data); 
     } 
     the_condition_variable.notify_one(); 
    } 

    bool empty() const{ 
     std::unique_lock<std::mutex> lock(the_mutex); 
     return the_queue.empty(); 
    } 

    const size_t size() const{ 
     std::lock_guard<std::mutex> lock(the_mutex); 
     return the_queue.size(); 
    } 

    bool try_pop(Data& popped_value){ 
     std::unique_lock<std::mutex> lock(the_mutex); 
     if(the_queue.empty()){ 
      return false; 
     } 
     popped_value=the_queue.front(); 
     the_queue.pop(); 
     return true; 
    } 

    void wait_and_pop(Data& popped_value){ 
     std::unique_lock<std::mutex> lock(the_mutex); 
     while(the_queue.empty()){ 
      the_condition_variable.wait(lock); 
     } 
     popped_value=the_queue.front(); 
     the_queue.pop(); 
    } 
}; 

std::atomic<bool> done(true); 
typedef std::vector<std::string> segment; 
concurrent_queue<segment> data; 
const int one_block = 15; 

void producer() 
{ 
    done.store(false); 
    std::ifstream inFile("c:/sample.txt"); 
    if(!inFile.is_open()){ 
     std::cout << "Can't read from file\n"; 
     return; 
    } 

    std::string line; 
    segment seg; 
    int cnt = 0; 
    while(std::getline(inFile,line)){ 
     seg.push_back(line); 
     ++cnt; 
     if(cnt == one_block){ 
      data.push(seg); 
      seg.clear(); 
      cnt = 0; 
     } 
    } 
    inFile.close(); 
    done.store(true); 
    std::cout << "all done\n"; 
} 

void consumer(std::string fname) 
{ 
    std::ofstream outFile(fname.c_str()); 
    if(!outFile.is_open()){ 
     std::cout << "Can't write to file\n"; 
     return; 
    } 

    do{ 
     while(!data.empty()){ 
      segment seg; 
      data.wait_and_pop(seg); 
      for(size_t i = 0; i < seg.size(); ++i) 
      { 
       outFile << seg[i] << std::endl; 
      } 
      outFile.flush(); 
     } 
    } while(!done.load()); 
    outFile.close(); 
    std::cout << fname << " done.\n"; 
} 

int main() 
{ 
    std::thread th0(producer); 
    std::thread th1(consumer, "Worker1.txt"); 
    std::thread th2(consumer, "Worker2.txt"); 
    std::thread th3(consumer, "Worker3.txt"); 

    th0.join(); 
    th1.join(); 
    th2.join(); 
    th3.join(); 

    return 0; 
} 

回答

1

我用来终止等待队列的所有线程的方法是在队列上标记一个标志,说明在检查pop()函数中是否存在元素之前,它是否已完成测试。如果该标志指示程序应停止,则任何调用pop()的线程将在队列中没有元素时抛出异常。当标志被改变时,改变的线程只会在相应的条件变量上调用notify_all()

+0

你看过我的std :: atomic bool的用法吗?基本上这是我的完成标志,但它没有给我想要的结果 – user841550

+0

@SteveJessop:如果队列中有对象,线程只是提取对象并对其进行处理。如果队列中没有其他元素,线程就会看到它不应该等待并抛出。 –

+0

@DietmarKühl:哦,是的,对不起。 –

0

请看下面的代码:

while(!data.empty()){ 
    segment seg; 
    data.wait_and_pop(seg); 
    ... 

考虑的情况下数据的最后一段被读取。并且消费者th1 &​​正在等待数据被读取。

消费者th1检查!data.empty(),发现有数据需要读取。然后在th1调用data.wait_and_pop()之前,消费者​​检查!data.empty()并发现它是正确的。假设消费者th1消耗最后一部分。现在,由于没有要读取的段,所以​​无限期地等待the_queue.empty(),data.wait_and_pop()

试试这个代码,而不是上面的代码片段:

segment seg; 
while(data.try_pop(seg)){ 
    ... 

应该得到它的工作。

+0

我给这个一杆 – user841550

+0

这是否工作的任何更新.. – user1055604

+0

没有,这对我不起作用。事实上,它导致消费者线程退出而没有做任何工作。不过,我可能误解了你的建议。 – user841550

0

您可能要将一个布尔标志添加到concurrent_queue。一旦文件被读取,就设置它(在互斥体下)。一旦文件被读取队列为空,从使用notify_all清空队列的使用者广播条件变量。

这会唤醒所有其他消费者,这些消费者需要发现最终条件(标志设置和队列空)并退出循环。为了避免竞争条件,这意味着他们需要在首先等待之前检查相同的组合条件。

现有标志存在的问题是线程永远不会等待condvar,从不检查它。 “完成”标志需要成为他们正在等待的状态的一部分。

[编辑:迪特马尔对标志巧妙地不同的含义可能导致简单的代码,但我没有写他们两个比较]

+0

我正在使用std ::原子作为完成标志,但它没有给我想要的结果。你能发现我做错了什么? – user841550

+0

@ user841550:已更新。挂起的线程在'wait_and_pop',对不对?如果不是,那么我误会了某些东西。 –