我的程序生产者线程从文本文件(具有大约8000行文本)中读取文本行并将行加载到 并发队列中。单一生产者 - 多个消费者:如何告诉消费者生产已完成
三个使用者线程从队列中读取每行写入单独文件的行。
当我运行程序只有生产者线程和只有一个消费者线程完成。其他两个线程 似乎挂起。
如何可靠地告诉所有消费者线程已达到文件末尾,因此应返回 但确保队列完全为空。
我的平台是Windows 7 64位
VC11。
编译为64位和32位的代码得到相同的行为。
这是代码。 (这是自包含的,编译)
#include <queue>
#include<iostream>
#include<fstream>
#include <atomic>
#include <thread>
#include <condition_variable>
#include <mutex>
#include<string>
#include<memory>
template<typename Data>
class concurrent_queue
{
private:
std::queue<Data> the_queue;
mutable std::mutex the_mutex;
std::condition_variable the_condition_variable;
public:
void push(Data const& data){
{
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push(data);
}
the_condition_variable.notify_one();
}
bool empty() const{
std::unique_lock<std::mutex> lock(the_mutex);
return the_queue.empty();
}
const size_t size() const{
std::lock_guard<std::mutex> lock(the_mutex);
return the_queue.size();
}
bool try_pop(Data& popped_value){
std::unique_lock<std::mutex> lock(the_mutex);
if(the_queue.empty()){
return false;
}
popped_value=the_queue.front();
the_queue.pop();
return true;
}
void wait_and_pop(Data& popped_value){
std::unique_lock<std::mutex> lock(the_mutex);
while(the_queue.empty()){
the_condition_variable.wait(lock);
}
popped_value=the_queue.front();
the_queue.pop();
}
};
std::atomic<bool> done(true);
typedef std::vector<std::string> segment;
concurrent_queue<segment> data;
const int one_block = 15;
void producer()
{
done.store(false);
std::ifstream inFile("c:/sample.txt");
if(!inFile.is_open()){
std::cout << "Can't read from file\n";
return;
}
std::string line;
segment seg;
int cnt = 0;
while(std::getline(inFile,line)){
seg.push_back(line);
++cnt;
if(cnt == one_block){
data.push(seg);
seg.clear();
cnt = 0;
}
}
inFile.close();
done.store(true);
std::cout << "all done\n";
}
void consumer(std::string fname)
{
std::ofstream outFile(fname.c_str());
if(!outFile.is_open()){
std::cout << "Can't write to file\n";
return;
}
do{
while(!data.empty()){
segment seg;
data.wait_and_pop(seg);
for(size_t i = 0; i < seg.size(); ++i)
{
outFile << seg[i] << std::endl;
}
outFile.flush();
}
} while(!done.load());
outFile.close();
std::cout << fname << " done.\n";
}
int main()
{
std::thread th0(producer);
std::thread th1(consumer, "Worker1.txt");
std::thread th2(consumer, "Worker2.txt");
std::thread th3(consumer, "Worker3.txt");
th0.join();
th1.join();
th2.join();
th3.join();
return 0;
}
你看过我的std :: atomic bool的用法吗?基本上这是我的完成标志,但它没有给我想要的结果 –
user841550
@SteveJessop:如果队列中有对象,线程只是提取对象并对其进行处理。如果队列中没有其他元素,线程就会看到它不应该等待并抛出。 –
@DietmarKühl:哦,是的,对不起。 –