我已经想出了以下阻塞队列实现,与std :: vector作为存储在队列中的元素的容器和使用Boost进行线程/同步。我也提到了一个类似的帖子here。C++阻塞队列与升压
template<typename T>
class BlockingQueue
{
public:
explicit BlockingQueue(const std::vector<T>& buf):
buffer(buf)
{}
explicit BlockingQueue(): buffer()
{}
void push(const T& elem);
T pop();
~BlockingQueue()
{}
private:
boost::mutex mutex; // mutex variable
boost::condition_variable_any notEmptyCond; // condition variable, to check whether the queue is empty
std::vector<T> buffer;
};
template<typename T>
void BlockingQueue<T>::push(const T& elem)
{
boost::mutex::scoped_lock lock(mutex);
buffer.push_back(elem);
notEmptyCond.notify_one(); // notifies one of the waiting threads which are blocked on the queue
// assert(!buffer.empty());
}
template<typename T>
T BlockingQueue<T>::pop()
{
boost::mutex::scoped_lock lock(mutex);
notEmptyCond.wait(lock,[&](){ return (buffer.size() > 0); }); // waits for the queue to get filled and for a notification, to resume consuming
T elem = buffer.front();
buffer.erase(buffer.begin());
return elem;
}
我有两个线程(生产者/消费者),从文件中读取一个字符串,并将其填充到BlockingQueue的,另请从BlockingQueue的字符串,并打印出来。这两个都是从一个定义如下的类中初始化的。
class FileProcessor
{
public:
explicit FileProcessor():bqueue(),inFile("random.txt")
{
rt = boost::thread(boost::bind(&FileVerifier::read, this));
pt1 = boost::thread(boost::bind(&FileVerifier::process, this));
}
volatile ~FileProcessor()
{
rt.interrupt();
pt1.interrupt();
rt.join();
pt1.join();
}
/* Read strings from a file, populate them in the blocking-queue */
void read()
{
std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
boost::iostreams::filtering_istream in;
if (file.fail()) {
std::cout << "couldn't open the input file.. please check its name and read permissions\n";
return;
}
try {
in.push(file);
for(std::string inputStr; std::getline(in,inputStr);)
{
bqueue.push(inputStr);
std::cout << "inserted " << inputStr << "\n";
}
}
catch(std::exception& e) {
std::cout << "exception occurred while reading file\n" << e.what() << "\n";
}
}
/* Process the elements (dequeue and print) */
void process()
{
while (true)
{
std::string rstr = bqueue.pop();
std::cout << "consumed " << rstr << "\n";
}
}
private:
boost::mutex mutex;
boost::thread rt;
boost::thread pt1;
BlockingQueue<std::string> bqueue;
std::string inFile; // input file name from where the strings are read
};
我看到下面的输出(只包括快照):
运行1:
inserted AZ
inserted yezjAdCeV
inserted icKU
inserted q
inserted b
inserted DRQL
inserted aaOj
inserted CqlNRv
inserted e
inserted XuDemby
inserted rE
inserted YPk
inserted dLd
inserted xb
inserted bSrZdf
inserted sCQiRna
...
4趟:
consumed jfRnjSxrw
inserted INdmXSCr
consumed oIDlu
inserted FfXdARGu
consumed tAO
inserted mBq
consumed I
inserted aoXNhP
consumed OOAf
inserted Qoi
consumed wCxJXGWJu
inserted WZGYHluTV
consumed oIFOh
inserted kkIoFF
consumed ecAYyjHh
inserted C
consumed KdrBIixw
inserted Ldeyjtxe
...
我的问题:消费者线程有时候会控制队列的资源(能够出队和打印),有时却不是。我不知道为什么会发生这种情况。任何有关队列设计缺陷的提示都将不胜感激。谢谢!
观察:
当线程不从(FileProcessor)类的构造函数初始化,他们像预期的那样,即它们访问BlockingQueue的资源,做他们的读/写操作。请参阅下面的代码片段,了解发生这种行为的变化。
生产者 - 消费者线程不采取替代轮流,因为@ n.m注意到生产者不明确向消费者屈服。按照上述的观察,它们各自的输出分别为类似下面
inserted DZxcOw consumed inserted DZxcOw consumed robECjOp robECjOp inserted BaILFsVaA inserted HomURR inserted PVjLPb consumed BaILFsVaA consumed HomURR consumed PVjLPb inserted SHdBVSEyU consumed SHdBVSEyU consumed JaEH inserted JaEH inserted g inserted MwEgOVB inserted qlohoszv consumed g consumed MwEgOVB consumed qlohoszv consumed AsQgq inserted AsQgq inserted tbm inserted iriADeEL inserted Zoxs consumed tbm
给出的一个从一类构造函数外初始化。
#include <iostream>
#include <threading/file_processor.h> //has the FileProcessor class declaration
int main()
{
FileProcessor fp; //previously, I had only this statement which called the class constructor, from where the threads were initialized.
boost::thread rt(boost::bind(&FileProcessor::read, &fp));
boost::thread pt1(boost::bind(&FileProcessor::process, &fp));
rt.join();
pt1.join();
return 0;
}
修改FileProcessor类(取消了它的构造函数线程初始化)
#include <boost/iostreams/filtering_stream.hpp>
#include <threading/blocking_queue.h> //has the BlockingQueue class
using namespace boost::iostreams;
class FileProcessor
{
public:
explicit FileProcessor():bqueue(),inFile("random.txt")
{}
~FileProcessor()
{}
void read()
{
std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
filtering_istream in;
if (file.fail()) {
std::cout << "couldn't open the input file.. please check its name and read permissions\n";
return;
}
try {
in.push(file);
for(std::string inputStr; std::getline(in,inputStr);)
{
bqueue.push(inputStr);
std::cout << "inserted " << inputStr << "\n";
}
}
catch(std::exception& e) {
std::cout << "exception occurred while reading file\n" << e.what() << "\n";
}
}
void process()
{
while (true)
{
std::string rstr = bqueue.pop();
std::cout << "consumed " << rstr << "\n";
}
}
private:
BlockingQueue<std::string> bqueue;
std::string inFile; // input file name from where the strings are read
};
编辑:
2017年5月24日:删除了不准确的评论“获得了整个文件内容为一个缓冲区“。
目前尚不清楚你的意思。请描述您观察的内容。尽量不要做出任何结论,只要告诉我们你看到了什么(程序输出,调试器输出等等)。如果没有必要参与猜谜游戏,[mcve]将是一个很好的方式让其他人查看你的问题。 –
你应该考虑'boost :: concurrent :: sync_queue'。 – sbabbi
生产者/消费者是shared_mutex和读/写锁的经典场景。 – didiz