2016-07-20 149 views
1

我正试图用C++ 11并发支持来做到这一点。正确的方式来等待由多个线程通知的条件变量

我有一个线程池的工作线程都做同样的事情,其中​​一个主线程有一个条件变量数组(每个线程一个,他们需要'开始'同步,即不运行一个循环)。

for (auto &worker_cond : cond_arr) { 
     worker_cond.notify_one(); 
    } 

然后,此线程必须等待池的每个线程的通知再次重新启动其周期。什么是这样做的正确方法?有一个单一的条件变量,并等待一些整数每个线程是不是主要增加?像(仍然在主线程)

unique_lock<std::mutex> lock(workers_mtx); 
    workers_finished.wait(lock, [&workers] { return workers = cond_arr.size(); }); 
+1

快速问答:你有没有考虑过['std :: experimental :: barrier'](http://en.cppreference.com/w/cpp/experimental/barrier)? –

回答

1

我看到两个选项:

选项1:join()

基本上而不是使用条件变量在你的线程开始计算,你为每次迭代产生一个新线程,并使用join()等待它完成。然后你为下一次迭代产生新的线程,以此类推。

选项2:锁定

你不想要,只要一个线程仍在工作的主线程通知。所以每个线程都有自己的锁,在执行计算之前锁定它,然后解锁。你的主线程在调用notify()之前锁定它们,并在之后解锁它们。

+0

对于第一个,不会更容易使用。第二个不起作用,因为它可能发生,当主线程正在等待某个具有不同锁的其他线程的锁时,可以循环多次,这是我不想要的! – Aram

+0

如果在每个循环之后工作线程释放锁,然后等待条件变量并且只有在通知重新锁定并且执行一个周期时才会发生。 – Anedar

1

我没有看到你的解决方案没有根本的错误。

防护workersworkers_mtx并完成。

我们可以用计数信号对此进行抽象。

struct counting_semaphore { 
    std::unique_ptr<std::mutex> m=std::make_unique<std::mutex>(); 
    std::ptrdiff_t count = 0; 
    std::unique_ptr<std::condition_variable> cv=std::make_unique<std::condition_variable>(); 

    counting_semaphore(std::ptrdiff_t c=0):count(c) {} 
    counting_semaphore(counting_semaphore&&)=default; 

    void take(std::size_t n = 1) { 
    std::unique_lock<std::mutex> lock(*m); 
    cv->wait(lock, [&]{ if (count-std::ptrdiff_t(n) < 0) return false; count-=n; return true; }); 
    } 
    void give(std::size_t n = 1) { 
    { 
     std::unique_lock<std::mutex> lock(*m); 
     count += n; 
     if (count <= 0) return; 
    } 
    cv->notify_all(); 
    } 
}; 

take需要count路程,块如果没有足够的。

give增加到count,并通知是否有正数量。

现在工作者在两个信号量之间线程化渡轮令牌。

std::vector<counting_semaphore> m_worker_start{count}; 
counting_semaphore m_worker_done{0}; // not count, zero 
std::atomic<bool> m_shutdown = false; 

// master controller: 
for (each step) { 
    for (auto&& starts:m_worker_start) 
    starts.give(); 
    m_worker_done.take(count); 
} 

// master shutdown: 
m_shutdown = true; 
// wake up forever: 
for (auto&& starts:m_worker_start) 
    starts.give(std::size_t(-1)/2); 

// worker thread: 
while (true) { 
    master->m_worker_start[my_id].take(); 
    if (master->m_shutdown) return; 
    // do work 
    master->m_worker_done.give(); 
} 

或某些情况。

live example