2015-10-07 110 views
1

嗯,我需要帮助。我正在尝试做一些具体的事情,而我缺乏多线程技能正在让我失望。C++主线程通知线程通知主线程

基本上我的主程序/线程需要管理一些必须运行多次的“通道”。由于这些运行是独立的,每个通道都包含一个执行它们的线程。

因此,主线程必须等待所有通道(线程)完成其运行才能启动下一个通道。 并且所有频道都必须等待主线程可以运行的通知。

以下是我如何做到的 - 抱歉有点长!

#include <thread> 
#include <mutex> 
#include <condition_variable> 
#include <iostream> 
#include <atomic> 

std::mutex    g_lockprint; 
std::mutex    g_lockbatch; 
std::condition_variable g_nextbatch; 
std::mutex    g_lockready; 
std::condition_variable g_ready; 

int global_id = 0; 
int nbChannels = 5; 
std::atomic<int> nbChannelsLeftToEnd; 

class Channel { 

private: 

    int _id; 
    std::thread _th; 
    std::atomic<bool> next_batch; 
    std::atomic<bool> stop_th; 

public: 

    Channel() : _id(global_id++), _th(), next_batch(false), stop_th(false) {} 

    void go_for_next_batch() { next_batch = true; } 

    void start(int& start, int &end){ 
     _th = std::thread(&Channel::run, this, std::ref(start), std::ref(end)); 
    } 

    void stop(){ 
     stop_th = true; 
     _th.join(); 
    } 

    void run(int& start, int& end){ 
     while (!stop_th){ 
      { 
       std::unique_lock<std::mutex> locker(g_lockbatch); 
       g_nextbatch.wait(locker, [&](){return (next_batch==true); }); 
      } 

      // print a starting message 
      { 
       std::unique_lock<std::mutex> locker(g_lockprint); 
       std::cout << "[channel " << _id << "]\trunning in [" << start << "," << end << "]" << std::endl; 
      } 

      // simulate work 
      std::this_thread::sleep_for(std::chrono::seconds(1)); 

      // update the number of channels left to run 
      nbChannelsLeftToEnd--; 
      g_ready.notify_one(); 
      next_batch = false; 
     } 
    } 
}; 

int main() 
{ 
    int end = 100; 
    int batch = 10; 
    int startBatch = 0; 
    int endBatch = startBatch + batch; 

    // declare some channels (threads) 
    std::vector<Channel> channels(nbChannels); 

    // start the threads 
    for (auto& ch : channels) ch.start(startBatch, endBatch); 

    while (endBatch<=end){ 
     { 
      std::unique_lock<std::mutex> locker(g_lockprint); 
      std::cout << "[main]\trunning in [" << startBatch << "," << endBatch << "]" << std::endl; 
     } 
     nbChannelsLeftToEnd = nbChannels; 
     for (auto& ch : channels) ch.go_for_next_batch(); 
     g_nextbatch.notify_all(); 

     std::unique_lock<std::mutex> locker(g_lockready); 
     g_ready.wait(locker, [&](){return (nbChannelsLeftToEnd == 0); }); 

     startBatch += batch; 
     endBatch += batch; 
    } 

    for (auto& ch : channels) ch.stop(); 

    return 0; 
} 

但有时程序块,可能线程彼此等待,但我看不出为什么。 在任何情况下,加入线程(主要结束处的“stop”方法)会使我的程序无限期地运行,看不出为什么。

编辑:感谢您的意见和一些研究,我设法使用同步屏障获得工作程序,以便主线程可以等待所有其他线程完成当前批处理,然后再告诉他们开始下一个线程。 我重用从别人这里被引用Anthony Wiiliams's book阻隔码 - 这里的屏障

class barrier 
{ 
    unsigned const count; 
    std::atomic<unsigned> spaces; 
    std::atomic<unsigned> generation; 

public: 
    explicit barrier(unsigned count_) : 
     count(count_), spaces(count), generation(0) {} 

    void wait() 
    { 
     unsigned const my_generation = generation; 
     if (!--spaces) 
     { 
      spaces = count; 
      ++generation; 
     } 
     else 
     { 
      while (generation == my_generation) 
       std::this_thread::yield(); 
     } 
    } 
}; 

下面是使用屏障通道类新运行方法 - 注意附加测试的“stop_th “国旗。当线程在最后一批之后并且在被连接之前被解锁时,它不应该运行另一批次,因此该测试。

void run(int& start, int& end, barrier& b) 
{ 
    while (!stop_th){ 
     // wait for next batch notification - use the next_batch flag to avoid 
     // spurious wake-ups 
     { 
      std::unique_lock<std::mutex> locker(g_lockbatch); 
      g_nextbatch.wait(locker, [&](){return (next_batch==true); }); 
     } 

     if (stop_th) return; 

     // simulate work 
     std::this_thread::sleep_for(std::chrono::seconds(1)); 

     // wait for everyone to meet 
     next_batch = false; 
     b.wait(); 
    } 
} 

最后这里的主要

int main() 
{ 
    int end = 100; 
    int batch = 10; 
    int startBatch = 0; 
    int endBatch = startBatch + batch; 

    // declare a barrier where all threads will meet 
    barrier b(nbChannels+1); 

    // declare some channels (threads) 
    std::vector<Channel> channels(nbChannels); 

    // start the threads 
    for (auto& ch : channels) ch.start(startBatch, endBatch, b); 

    while (endBatch<=end){ 

     // notify the channels they can process one batch 
     for (auto& ch : channels) ch.go_for_next_batch(); 
     g_nextbatch.notify_all(); 

     // wait until all threads have finished their batch 
     b.wait(); 

     // prepare the next one 
     startBatch += batch; 
     endBatch += batch; 
    } 

    // all channels are blocked by the next_batch condition 
    // so notify a next batch and join them 
    for (auto& ch : channels) ch.stop(); 
    for (auto& ch : channels) ch.go_for_next_batch(); 
    g_nextbatch.notify_all(); 
    for (auto& ch : channels) ch.wait_until_stopped(); 

    return 0; 
} 

再次感谢您的所有意见/答案!

+0

您应该能够调试器连接到现场处理(如:'GDB '在Linux上),并列出的当前状态线程。我发现这通常能够很好地说明造成问题的原因。我怀疑你应该尝试坚持一个互斥锁或者只按照严格的顺序锁定/解锁。 –

回答

0

由于我在cpp.sh中修复了代码,现在看起来完成了,所以我将我的评论更改为了答案。

关于它们在呼叫停止时不存在。 请注意,它们可能仍然卡在等待下一个批次锁定。 考虑添加一个调用来将它们从锁中释放,并让它们检查它们是否在锁定步骤后停止。

将stop函数分成两个函数,一个是更改布尔值,另一个是等待的地方。 可以调用两个函数stop和wait_until_stopped

然后将下面的代码添加到主函数中。

而不是

for (auto& ch : channels) ch.stop(); 

用途:

for (auto& ch : channels) ch.stop(); 

for (auto& ch : channels) ch.go_for_next_batch(); 

g_nextbatch.notify_all(); 

for (auto& ch : channels) ch.wait_until_stopped(); 
+0

+1!确实线程正在等待下一批,您的解决方案完美无缺!好吧,至少当我到达那一点时,程序经常会在此之前冻结,因此可能会留下一些问题... – pinch2k4

+0

如果程序中有冻结,则很可能是死锁。 锁之间的竞赛。 附上一个调试器,看看谁在等待@Component 10建议的人。 – Jonathan