在C++ 11,I具有其管理一个数字,经由单一lambda函数排队的线程的线程池对象。我知道我需要处理多少行数据,所以我提前知道我需要对N个作业进行排队。我不确定的是如何判断所有这些工作何时完成,因此我可以继续下一步。如何判断我的ThreadPool何时完成其任务?
这是管理的线程池代码:
#include <cstdlib>
#include <vector>
#include <deque>
#include <iostream>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
class ThreadPool;
class Worker {
public:
Worker(ThreadPool &s) : pool(s) { }
void operator()();
private:
ThreadPool &pool;
};
class ThreadPool {
public:
ThreadPool(size_t);
template<class F>
void enqueue(F f);
~ThreadPool();
void joinAll();
int taskSize();
private:
friend class Worker;
// the task queue
std::deque< std::function<void()> > tasks;
// keep track of threads
std::vector<std::thread> workers;
// sync
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
void Worker::operator()()
{
std::function<void()> task;
while(true)
{
{ // acquire lock
std::unique_lock<std::mutex>
lock(pool.queue_mutex);
// look for a work item
while (!pool.stop && pool.tasks.empty()) {
// if there are none wait for notification
pool.condition.wait(lock);
}
if (pool.stop) {// exit if the pool is stopped
return;
}
// get the task from the queue
task = pool.tasks.front();
pool.tasks.pop_front();
} // release lock
// execute the task
task();
}
}
// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for (size_t i = 0;i<threads;++i) {
workers.push_back(std::thread(Worker(*this)));
}
//workers.
//tasks.
}
// the destructor joins all threads
ThreadPool::~ThreadPool()
{
// stop all threads
stop = true;
condition.notify_all();
// join them
for (size_t i = 0;i<workers.size();++i) {
workers[i].join();
}
}
void ThreadPool::joinAll() {
// join them
for (size_t i = 0;i<workers.size();++i) {
workers[i].join();
}
}
int ThreadPool::taskSize() {
return tasks.size();
}
// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f)
{
{ // acquire lock
std::unique_lock<std::mutex> lock(queue_mutex);
// add the task
tasks.push_back(std::function<void()>(f));
} // release lock
// wake up one thread
condition.notify_one();
}
然后分发我的工作线程之间是这样的:
ThreadPool pool(4);
/* ... */
for (int y=0;y<N;y++) {
pool->enqueue([this,y] {
this->ProcessRow(y);
});
}
// wait until all threads are finished
std::this_thread::sleep_for(std::chrono::milliseconds(100));
等待100毫秒的作品,只是因为我知道这些工作可以完成时间少于100毫秒,但显然它不是最好的方法。一旦完成了N行处理,它需要经历另外1000代左右的相同事情。显然,我想尽快开始下一代。
我知道一定有某种方式将代码添加到我的线程池,这样我可以做这样的事情:
while (pool->isBusy()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
我一直在做这个了几晚,现在我觉得很难找到如何做到这一点的好例子。 那么,这将是对我是否履行isBusy()方法的正确方法?
使用条件变量和标志。等待条件变量并在其谓词中测试该标志。 –
我想问一下,如果你考虑使用像英特尔的Threading Buildings Blocks这样的东西?也许在BOOST中有很多有用的东西,而且微软也有自己的库。创建自己的线程池通常是最后的手段,以防万一你真的需要现有的线程池不能提供的东西,像英特尔的TBB这样的库也不会那样做。但在这里,我可以想象,这些任务将安排在TBB上,然后等待... – ipavlu
@ipavlu,它可能使用C++ 11编写多线程代码而无需额外的库。我试图避免额外的大量依赖。我解决了这个问题,并提供了我自己的答案。 – Octopus