2017-05-21 79 views
1

我已经想出了以下阻塞队列实现,与std :: vector作为存储在队列中的元素的容器和使用Boost进行线程/同步。我也提到了一个类似的帖子hereC++阻塞队列与升压

template<typename T> 
class BlockingQueue 
{ 
public: 
    explicit BlockingQueue(const std::vector<T>& buf): 
    buffer(buf) 
    {} 
    explicit BlockingQueue(): buffer() 
    {} 
    void push(const T& elem); 
    T pop(); 
    ~BlockingQueue() 
    {} 

private: 
    boost::mutex mutex;        // mutex variable 
    boost::condition_variable_any notEmptyCond;  // condition variable, to check whether the queue is empty 
    std::vector<T> buffer; 
}; 

template<typename T> 
void BlockingQueue<T>::push(const T& elem) 
{ 
    boost::mutex::scoped_lock lock(mutex); 
    buffer.push_back(elem); 
    notEmptyCond.notify_one();      // notifies one of the waiting threads which are blocked on the queue 
    // assert(!buffer.empty()); 
} 

template<typename T> 
T BlockingQueue<T>::pop() 
{ 
    boost::mutex::scoped_lock lock(mutex); 
    notEmptyCond.wait(lock,[&](){ return (buffer.size() > 0); }); // waits for the queue to get filled and for a notification, to resume consuming 
    T elem = buffer.front(); 
    buffer.erase(buffer.begin()); 
    return elem; 
} 

我有两个线程(生产者/消费者),从文件中读取一个字符串,并将其填充到BlockingQueue的,另请从BlockingQueue的字符串,并打印出来。这两个都是从一个定义如下的类中初始化的。

class FileProcessor 
{ 
public: 
    explicit FileProcessor():bqueue(),inFile("random.txt") 
    { 
    rt = boost::thread(boost::bind(&FileVerifier::read, this)); 
    pt1 = boost::thread(boost::bind(&FileVerifier::process, this)); 
    } 

    volatile ~FileProcessor() 
    { 
    rt.interrupt(); 
    pt1.interrupt(); 
    rt.join(); 
    pt1.join(); 
    } 

    /* Read strings from a file, populate them in the blocking-queue */ 
    void read() 
    { 
    std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary); 
    boost::iostreams::filtering_istream in; 
    if (file.fail()) { 
     std::cout << "couldn't open the input file.. please check its name and read permissions\n"; 
     return; 
    } 
    try { 
     in.push(file);      
     for(std::string inputStr; std::getline(in,inputStr);) 
     { 
     bqueue.push(inputStr); 
     std::cout << "inserted " << inputStr << "\n"; 
     } 
    } 
    catch(std::exception& e) { 
     std::cout << "exception occurred while reading file\n" << e.what() << "\n"; 
    } 
    } 

    /* Process the elements (dequeue and print) */ 
    void process() 
    { 
    while (true) 
    { 
     std::string rstr = bqueue.pop(); 
     std::cout << "consumed " << rstr << "\n"; 
    } 
    } 

private: 
    boost::mutex mutex; 
    boost::thread rt; 
    boost::thread pt1; 
    BlockingQueue<std::string> bqueue; 
    std::string inFile;  // input file name from where the strings are read 
}; 

我看到下面的输出(只包括快照):

运行1:

inserted AZ 
inserted yezjAdCeV 
inserted icKU 
inserted q 
inserted b 
inserted DRQL 
inserted aaOj 
inserted CqlNRv 
inserted e 
inserted XuDemby 
inserted rE 
inserted YPk 
inserted dLd 
inserted xb 
inserted bSrZdf 
inserted sCQiRna 
... 

4趟:

consumed jfRnjSxrw 
inserted INdmXSCr 
consumed oIDlu 
inserted FfXdARGu 
consumed tAO 
inserted mBq 
consumed I 
inserted aoXNhP 
consumed OOAf 
inserted Qoi 
consumed wCxJXGWJu 
inserted WZGYHluTV 
consumed oIFOh 
inserted kkIoFF 
consumed ecAYyjHh 
inserted C 
consumed KdrBIixw 
inserted Ldeyjtxe 
... 

我的问题:消费者线程有时候会控制队列的资源(能够出队和打印),有时却不是。我不知道为什么会发生这种情况。任何有关队列设计缺陷的提示都将不胜感激。谢谢!

观察:

  1. 当线程不从(FileProcessor)类的构造函数初始化,他们像预期的那样,即它们访问BlockingQueue的资源,做他们的读/写操作。请参阅下面的代码片段,了解发生这种行为的变化。

  2. 生产者 - 消费者线程不采取替代轮流,因为@ n.m注意到生产者不明确向消费者屈服。按照上述的观察,它们各自的输出分别为类似下面

    inserted DZxcOw 
    consumed inserted DZxcOw 
    consumed robECjOp 
    robECjOp 
    inserted BaILFsVaA 
    inserted HomURR 
    inserted PVjLPb 
    consumed BaILFsVaA 
    consumed HomURR 
    consumed PVjLPb 
    inserted SHdBVSEyU 
    consumed SHdBVSEyU 
    consumed JaEH 
    inserted JaEH 
    inserted g 
    inserted MwEgOVB 
    inserted qlohoszv 
    consumed g 
    consumed MwEgOVB 
    consumed qlohoszv 
    consumed AsQgq 
    inserted AsQgq 
    inserted tbm 
    inserted iriADeEL 
    inserted Zoxs 
    consumed tbm 
    

给出的一个从一类构造函数外初始化。

#include <iostream> 
#include <threading/file_processor.h> //has the FileProcessor class declaration 

int main() 
{ 
    FileProcessor fp; //previously, I had only this statement which called the class constructor, from where the threads were initialized. 
    boost::thread rt(boost::bind(&FileProcessor::read, &fp)); 
    boost::thread pt1(boost::bind(&FileProcessor::process, &fp)); 
    rt.join(); 
    pt1.join(); 
    return 0; 
} 

修改FileProcessor类(取消了它的构造函数线程初始化)

#include <boost/iostreams/filtering_stream.hpp> 
#include <threading/blocking_queue.h> //has the BlockingQueue class 

using namespace boost::iostreams; 

    class FileProcessor 
    { 
    public: 
     explicit FileProcessor():bqueue(),inFile("random.txt") 
     {} 

    ~FileProcessor() 
    {} 

    void read() 
    { 
    std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary); 
    filtering_istream in; 
    if (file.fail()) { 
     std::cout << "couldn't open the input file.. please check its name and read permissions\n"; 
     return; 
    } 
    try { 
     in.push(file); 
     for(std::string inputStr; std::getline(in,inputStr);) 
     { 
     bqueue.push(inputStr); 
     std::cout << "inserted " << inputStr << "\n"; 
     } 
    } 
    catch(std::exception& e) { 
     std::cout << "exception occurred while reading file\n" << e.what() << "\n"; 
    } 
    } 

    void process() 
    { 
    while (true) 
    { 
     std::string rstr = bqueue.pop(); 
     std::cout << "consumed " << rstr << "\n"; 
    } 
    } 

private: 
    BlockingQueue<std::string> bqueue; 
    std::string inFile;  // input file name from where the strings are read 
    }; 

编辑:

2017年5月24日:删除了不准确的评论“获得了整个文件内容为一个缓冲区“。

+1

目前尚不清楚你的意思。请描述您观察的内容。尽量不要做出任何结论,只要告诉我们你看到了什么(程序输出,调试器输出等等)。如果没有必要参与猜谜游戏,[mcve]将是一个很好的方式让其他人查看你的问题。 –

+1

你应该考虑'boost :: concurrent :: sync_queue'。 – sbabbi

+0

生产者/消费者是shared_mutex和读/写锁的经典场景。 – didiz

回答

0

事实上,没有设计缺陷,只是对操作系统如何安排线程的预期存在缺陷。

以下是增加最大队列“深度”(capacity)的版本,并在队列达到容量时制作推块。然后该演示使用容量为1来显示完美的turn-by-turn消耗(当然,这是次优的,性能方面)。

我已将_any条件替换为常规条件,因为您可以。我暂时放弃了iostreams的使用(注意,评论// gets the whole file content into an input stream buffer无论如何都是完全不准确的)。

Live On Coliru

#include <boost/thread.hpp> 
#include <deque> 
#include <fstream> 

#include <iostream> 
static boost::mutex s_iomutex; 

template <typename T> class BlockingQueue { 
    public: 
    explicit BlockingQueue(size_t capacity) : _buffer(), _capacity(capacity) { 
     assert(capacity>0); 
    } 

    void push(const T &elem) { 
     boost::unique_lock<boost::mutex> lock(_mutex); 
     _pop_event.wait(lock, [&] { return _buffer.size() < _capacity; }); 
     _buffer.push_back(elem); 
     _push_event.notify_one(); // notifies one of the waiting threads which are blocked on the queue 
     // assert(!_buffer.empty()); 
    } 

    T pop() { 
     boost::unique_lock<boost::mutex> lock(_mutex); 
     _push_event.wait(lock, [&] { return _buffer.size() > 0; }); 

     T elem = _buffer.front(); 
     _buffer.pop_front(); 
     _pop_event.notify_one(); 
     return elem; 
    } 

    private: 
    boost::mutex _mutex; 
    boost::condition_variable _push_event, _pop_event; 
    std::deque<T> _buffer; 
    size_t _capacity; 
}; 

class FileProcessor { 
    public: 
    explicit FileProcessor(size_t capacity = 10) : bqueue(capacity), inFile("random.txt") {} 

    /* Read strings from a file, populate them in the blocking-queue */ 
    void read() { 
     try { 
      std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary); 
      for (std::string inputStr; std::getline(file, inputStr);) { 
       bqueue.push(inputStr); 

       boost::lock_guard<boost::mutex> lock(s_iomutex); 
       std::cout << "inserted " << inputStr << "\n"; 
      } 
     } catch (std::exception &e) { 
      std::cerr << "exception occurred while reading file\n" << e.what() << "\n"; 
     } 
    } 

    /* Process the elements (dequeue and print) */ 
    void process() { 
     while (true) { 
      std::string rstr = bqueue.pop(); 
      boost::lock_guard<boost::mutex> lock(s_iomutex); 
      std::cout << "consumed " << rstr << "\n"; 
     } 
    } 

    private: 
    BlockingQueue<std::string> bqueue; 
    std::string inFile; // input file name from where the strings are read 
}; 

int main() { 
    FileProcessor fp(1); 
    boost::thread rt(boost::bind(&FileProcessor::read, &fp)); 
    boost::thread pt1(boost::bind(&FileProcessor::process, &fp)); 
    rt.join(); 

    pt1.interrupt(); 
    pt1.join(); 
} 

打印

inserted 1 15786 
inserted 2 2099 
consumed 1 15786 
consumed 2 2099 
inserted 3 23963 
consumed 3 23963 
inserted 4 6928 
consumed 4 6928 
inserted 5 16279 
consumed 5 16279 
inserted 6 26787 
consumed 6 26787 
inserted 7 13463 
consumed 7 13463 
inserted 8 14099 
consumed 8 14099 
inserted 9 21808 
consumed 9 21808 
inserted 10 22618 
consumed 10 22618 
inserted 11 10618 
consumed 11 10618 
inserted 12 8211 
consumed 12 8211 
inserted 13 32033 
consumed 13 32033 
inserted 14 14512 
consumed 14 14512 
inserted 15 17734 
consumed 15 17734 
inserted 16 3632 
consumed 16 3632 
inserted 17 8265 
consumed 17 8265 
inserted 18 17922 
consumed 18 17922 
inserted 19 15753 
consumed 19 15753 
inserted 20 7474 
consumed 20 7474 
inserted 21 20136 
consumed 21 20136 
inserted 22 12334 
consumed 22 12334 
inserted 23 23299 
consumed 23 23299 
inserted 24 4066 
consumed 24 4066 
inserted 25 5173 
consumed 25 5173 
inserted 26 17640 
consumed 26 17640 
inserted 27 19218 
consumed 27 19218 
inserted 28 26387 
consumed 28 26387 
inserted 29 26357 
consumed 29 26357 
inserted 30 15206 
consumed 30 15206 
inserted 31 28714 
consumed 31 28714 
inserted 32 32648 
consumed 32 32648 
inserted 33 1500 
consumed 33 1500 
inserted 34 20941 
consumed 34 20941 
inserted 35 3838 
consumed 35 3838 
inserted 36 29680 
consumed 36 29680 
inserted 37 24626 
consumed 37 24626 
inserted 38 14824 
consumed 38 14824 
inserted 39 19690 
consumed 39 19690 
inserted 40 27815 
consumed 40 27815 
inserted 41 6760 
consumed 41 6760 
inserted 42 21322 
consumed 42 21322 
inserted 43 17966 
consumed 43 17966 
inserted 44 15292 
consumed 44 15292 
inserted 45 23321 
consumed 45 23321 
inserted 46 7437 
consumed 46 7437 
inserted 47 5444 
consumed 47 5444 
inserted 48 26785 
consumed 48 26785 
inserted 49 22430 
consumed 49 22430 
inserted 50 25417 
consumed 50 25417 
inserted 51 10408 
consumed 51 10408 
inserted 52 32096 
consumed 52 32096 
inserted 53 489 
consumed 53 489 
inserted 54 7083 
consumed 54 7083 
inserted 55 21555 
consumed 55 21555 
inserted 56 3759 
consumed 56 3759 
inserted 57 20811 
consumed 57 20811 
inserted 58 20176 
consumed 58 20176 
inserted 59 31305 
consumed 59 31305 
inserted 60 9894 
consumed 60 9894 
inserted 61 5515 
consumed 61 5515 
inserted 62 9978 
consumed 62 9978 
inserted 63 1981 
consumed 63 1981 
inserted 64 22286 
consumed 64 22286 
inserted 65 11081 
consumed 65 11081 
inserted 66 4392 
consumed 66 4392 
inserted 67 2252 
consumed 67 2252 
inserted 68 16714 
consumed 68 16714 
inserted 69 16003 
consumed 69 16003 
inserted 70 16695 
consumed 70 16695 
inserted 71 11288 
consumed 71 11288 
inserted 72 4788 
consumed 72 4788 
inserted 73 14454 
consumed 73 14454 
inserted 74 29920 
consumed 74 29920 
inserted 75 25154 
consumed 75 25154 
inserted 76 6206 
consumed 76 6206 
inserted 77 14444 
consumed 77 14444 
inserted 78 2921 
consumed 78 2921 
inserted 79 26908 
consumed 79 26908 
inserted 80 24148 
consumed 80 24148 
inserted 81 8487 
consumed 81 8487 
inserted 82 11371 
consumed 82 11371 
inserted 83 31047 
consumed 83 31047 
inserted 84 27749 
consumed 84 27749 
inserted 85 13548 
consumed 85 13548 
inserted 86 13807 
consumed 86 13807 
inserted 87 9411 
consumed 87 9411 
inserted 88 21999 
consumed 88 21999 
inserted 89 24386 
consumed 89 24386 
inserted 90 10190 
consumed 90 10190 
inserted 91 2472 
consumed 91 2472 
inserted 92 17149 
consumed 92 17149 
inserted 93 14288 
consumed 93 14288 
inserted 94 31625 
consumed 94 31625 
inserted 95 4732 
consumed 95 4732 
inserted 96 20273 
consumed 96 20273 
inserted 97 29036 
consumed 97 29036 
inserted 98 4425 
consumed 98 4425 
inserted 99 1563 
consumed 99 1563 
inserted 100 2796 
consumed 100 2796 
inserted 101 24374 
consumed 101 24374 
inserted 102 8151 
consumed 102 8151 
inserted 103 31361 
consumed 103 31361 
inserted 104 22466 
consumed 104 22466 
inserted 105 23365 
consumed 105 23365 
inserted 106 23762 
consumed 106 23762 
inserted 107 3616 
consumed 107 3616 
inserted 108 7711 
consumed 108 7711 
inserted 109 23178 
consumed 109 23178 
inserted 110 18791 
consumed 110 18791 
inserted 111 13371 
consumed 111 13371 
inserted 112 14553 
consumed 112 14553 
inserted 113 32026 
consumed 113 32026 
inserted 114 4567 
consumed 114 4567 
inserted 115 22178 
consumed 115 22178 
inserted 116 23947 
inserted 117 5928 
consumed 116 23947 
consumed 117 5928 
inserted 118 25606 
consumed 118 25606 
inserted 119 5141 
consumed 119 5141 
inserted 120 17681 
consumed 120 17681 
inserted 121 8024 
consumed 121 8024 
inserted 122 9094 
consumed 122 9094 
inserted 123 24878 
consumed 123 24878 
inserted 124 27800 
consumed 124 27800 
inserted 125 10225 
consumed 125 10225 
inserted 126 1157 
consumed 126 1157 
inserted 127 28217 
consumed 127 28217 
inserted 128 15144 
consumed 128 15144 
inserted 129 25692 
consumed 129 25692 
inserted 130 250 
consumed 130 250 
inserted 131 17432 
consumed 131 17432 
inserted 132 10055 
consumed 132 10055 
inserted 133 24279 
consumed 133 24279 
inserted 134 9445 
consumed 134 9445 
inserted 135 4149 
consumed 135 4149 
inserted 136 23240 
consumed 136 23240 
inserted 137 23146 
consumed 137 23146 
inserted 138 8576 
consumed 138 8576 
inserted 139 11469 
consumed 139 11469 
inserted 140 27250 
consumed 140 27250 
inserted 141 12203 
consumed 141 12203 
inserted 142 21730 
consumed 142 21730 
inserted 143 30824 
consumed 143 30824 
inserted 144 11197 
consumed 144 11197 
inserted 145 11076 
consumed 145 11076 
inserted 146 6960 
consumed 146 6960 
inserted 147 7313 
consumed 147 7313 
inserted 148 16701 
consumed 148 16701 
inserted 149 21044 
consumed 149 21044 
inserted 150 9934 
consumed 150 9934 
inserted 151 18562 
consumed 151 18562 
inserted 152 3559 
consumed 152 3559 
inserted 153 5541 
consumed 153 5541 
inserted 154 16024 
consumed 154 16024 
inserted 155 9877 
consumed 155 9877 
inserted 156 18443 
consumed 156 18443 
inserted 157 6312 
consumed 157 6312 
inserted 158 24237 
consumed 158 24237 
inserted 159 27685 
consumed 159 27685 
inserted 160 6154 
consumed 160 6154 
inserted 161 32723 
consumed 161 32723 
inserted 162 8358 
consumed 162 8358 
inserted 163 5518 
consumed 163 5518 
inserted 164 15857 
consumed 164 15857 
inserted 165 26383 
consumed 165 26383 
inserted 166 13179 
consumed 166 13179 
inserted 167 29919 
consumed 167 29919 
inserted 168 5135 
consumed 168 5135 
inserted 169 7147 
consumed 169 7147 
inserted 170 4383 
consumed 170 4383 
inserted 171 13147 
consumed 171 13147 
inserted 172 15658 
consumed 172 15658 
inserted 173 18478 
consumed 173 18478 
inserted 174 29793 
consumed 174 29793 
inserted 175 16003 
consumed 175 16003 
inserted 176 12804 
consumed 176 12804 
inserted 177 25713 
consumed 177 25713 
inserted 178 28108 
consumed 178 28108 
inserted 179 8518 
consumed 179 8518 
inserted 180 9874 
consumed 180 9874 
inserted 181 30731 
consumed 181 30731 
inserted 182 15582 
consumed 182 15582 
inserted 183 12589 
consumed 183 12589 
inserted 184 15839 
consumed 184 15839 
inserted 185 19505 
consumed 185 19505 
inserted 186 20543 
consumed 186 20543 
inserted 187 6331 
consumed 187 6331 
inserted 188 25289 
consumed 188 25289 
inserted 189 14877 
consumed 189 14877 
inserted 190 25571 
consumed 190 25571 
inserted 191 10873 
consumed 191 10873 
inserted 192 13568 
consumed 192 13568 
inserted 193 16319 
consumed 193 16319 
inserted 194 28590 
consumed 194 28590 
inserted 195 22303 
consumed 195 22303 
inserted 196 20685 
consumed 196 20685 
inserted 197 1528 
consumed 197 1528 
inserted 198 5200 
consumed 198 5200 
inserted 199 25689 
consumed 199 25689 
inserted 200 25140 
consumed 200 25140 
+0

Hi @sehe。我从你的回答和用户n.m的评论中了解到,根据我想通过阻塞队列实现的目标,其实现可能会改变。当线程封装在一个类中时,我想交替测试单个生产者/消费者对队列的访问。你的方法看起来很整洁,谢谢! 我已经删除了不准确的评论btw。然后尝试内存映射IO。 – freax

+0

如果你想要的是基于任务的处理,请考虑Asio:http://coliru.stacked-crooked.com/a/7e9361f5397c8e8e(多线程和单线程演示) – sehe

+1

哦,这里演示了2个FileProcessor实例,仍然在单线程http://coliru.stacked-crooked.com/a/1cffb328e414a38e – sehe