2015-06-22 92 views
3

这是一个简单的C++线程池实现。它是从https://github.com/progschj/ThreadPool开始的改变版本。std :: condition_variable - 通知一次,但等待线程被唤醒两次

#ifndef __THREAD_POOL_H__ 
#define __THREAD_POOL_H__ 

#include <vector> 
#include <queue> 
#include <memory> 
#include <thread> 
#include <chrono> 
#include <mutex> 
#include <condition_variable> 
#include <future> 
#include <functional> 
#include <stdexcept> 

namespace ThreadPool { 

class FixedThreadPool { 
public: 
    FixedThreadPool(size_t); 

    template<class F, class... Args> 
    auto Submit(F&& f, Args&&... args) 
     -> std::future<typename std::result_of<F(Args...)>::type>; 

    template<class F, class... Args> 
    void Execute(F&& f, Args&&... args); 

    ~FixedThreadPool(); 

    void AwaitTermination(); 

    void Stop(); 

private: 
    void ThreadWorker(); 

    // need to keep track of threads so we can join them 
    std::vector<std::thread> workers; 

    // the task queue 
    std::queue< std::function<void()> > tasks; 

    // synchronization 
    std::mutex worker_mutex; 
    std::mutex queue_mutex; 
    std::condition_variable condition; 

    // stop flag 
    bool stop_; 

    // thread size 
    int thread_size_; 
}; 

// Constructor does nothing. Threads are created when new task submitted. 
FixedThreadPool::FixedThreadPool(size_t num_threads): 
    stop_(false), 
    thread_size_(num_threads) {} 

// Destructor joins all threads 
FixedThreadPool::~FixedThreadPool() { 
    //std::this_thread::sleep_for(std::chrono::seconds(5)); 
    for(std::thread &worker: workers) { 
    if (worker.joinable()) { 
     worker.join(); 
    } 
    } 
} 

// Thread worker 
void FixedThreadPool::ThreadWorker() { 
    std::function<void()> task; 
    while (1) { 
    { 
     std::unique_lock<std::mutex> lock(this->queue_mutex); 
     this->condition.wait(lock, 
        [this]() { return this->stop_ || !this->tasks.empty(); }); 
     printf("wakeeeeeened\n"); 
     if (this->stop_ && this->tasks.empty()) { 
     printf("returning ...\n"); 
     return; 
     } 
     task = std::move(this->tasks.front()); 
     this->tasks.pop(); 
    } 
    task(); 
    } 
} 

// Add new work item to the pool 
template<class F, class... Args> 
auto FixedThreadPool::Submit(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type > 
{ 
    { 
    std::unique_lock<std::mutex> lock(this->worker_mutex); 
    if (workers.size() < thread_size_) { 
     workers.emplace_back(std::thread(&FixedThreadPool::ThreadWorker, this)); 
    } 
    } 

    using return_type = typename std::result_of<F(Args...)>::type; 

    auto task = std::make_shared< std::packaged_task<return_type()> >(
     std::bind(std::forward<F>(f), std::forward<Args>(args)...) 
); 

    std::future<return_type> res = task->get_future(); 
    { 
    std::unique_lock<std::mutex> lock(queue_mutex); 
    if(stop_) { 
     throw std::runtime_error("ThreadPool has been shutdown."); 
    } 
    tasks.emplace([task]() { (*task)(); }); 
    } 
    condition.notify_one(); 
    return res; 
} 

// Execute new task without returning std::future object. 
template<class F, class... Args> 
void FixedThreadPool::Execute(F&& f, Args&&... args) { 
    Submit(std::forward<F>(f), std::forward<Args>(args)...); 
} 

// Blocks and wait for all previously submitted tasks to be completed. 
void FixedThreadPool::AwaitTermination() { 
    for(std::thread &worker: workers) { 
    if (worker.joinable()) { 
     worker.join(); 
    } 
    } 
} 

// Shut down the threadpool. This method does not wait for previously submitted 
// tasks to be completed. 
void FixedThreadPool::Stop() { 
    printf("Stopping ...\n"); 
    { 
    std::unique_lock<std::mutex> lock(queue_mutex); 
    stop_ = true; 
    } 
} 


} // namespace ThreadPool 

#endif /* __THREAD_POOL_H__ */ 

和测试的main.cpp:

#include <iostream> 
#include <vector> 
#include <chrono> 
#include <exception> 

#include "ThreadPool.h" 

int main(int argc, char** argv) { 
    ThreadPool::FixedThreadPool pool(3); 

    pool.Execute([]() { 
     std::cout << "hello world" << std::endl; 
    } 
); 
    pool.Stop(); 
    pool.AwaitTermination(); 
    std::cout << "All tasks complted." << std::endl; 

    return 0; 
} 

我在这个测试程序中的错误。只有一个任务被提交到线程池,但我的工作线程被唤醒了两声:

>>./test 
Stopping ... 
wakeeeeeened 
hello world 
wakeeeeeened 
returning ... 
All tasks complted. 

我认为这个问题是在FixedThreadPool :: ThreadWorker()本身。工作线程持续等待条件变量以获取新任务。函数FixedThreadPool :: Submit()将新任务添加到队列并调用condition.nofity_one()来唤醒正在工作的线程。

但我无法弄清楚工作线程如何被唤醒两次。我只有一个任务提交,并且在此测试中只有一个工作线程。

+0

可能与https://en.wikipedia.org/wiki/Spurious_wakeup – cppguy

+0

@cppguy不符合谓词版本。 –

+2

OT:你的包括守卫*非常*非法! http://stackoverflow.com/questions/228783/what-are-the-rules-about-using-an-underscore-in-a-c-ntifntifier –

回答

3

转换注释到答案:

condition_variable::wait(lock, pred)相当于while(!pred()) wait(lock);。如果pred()返回true那么实际上不会发生等待并且该呼叫立即返回。

你的第一次唤醒是来自notify_one()的呼叫;第二个“唤醒”是因为第二个wait()调用恰好在Stop()调用后执行,因此您的谓词返回truewait()而不等待。

很明显,您在这里得到了(非)幸运:如果第二个wait()调用发生在Stop()调用之前,那么您的工作线程将永久等待(在没有虚假唤醒的情况下),因此将你的主线程。另外,摆脱__THREAD_POOL_H__。将这些双下划线烧到地上。

相关问题