2015-11-11 33 views
3

在C++ 11,I具有其管理一个数字,经由单一lambda函数排队的线程的线程池对象。我知道我需要处理多少行数据,所以我提前知道我需要对N个作业进行排队。我不确定的是如何判断所有这些工作何时完成,因此我可以继续下一步。如何判断我的ThreadPool何时完成其任务?

这是管理的线程池代码:

#include <cstdlib> 
#include <vector> 
#include <deque> 
#include <iostream> 
#include <atomic> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 

class ThreadPool; 

class Worker { 
public: 
    Worker(ThreadPool &s) : pool(s) { } 
    void operator()(); 
private: 
    ThreadPool &pool; 
}; 

class ThreadPool { 
public: 
    ThreadPool(size_t); 
    template<class F> 
    void enqueue(F f); 
    ~ThreadPool(); 
    void joinAll(); 
    int taskSize(); 

private: 
    friend class Worker; 

    // the task queue 
    std::deque< std::function<void()> > tasks; 

    // keep track of threads 
    std::vector<std::thread> workers; 

    // sync 
    std::mutex queue_mutex; 
    std::condition_variable condition; 
    bool stop; 
}; 

void Worker::operator()() 
{ 
    std::function<void()> task; 
    while(true) 
    { 
     { // acquire lock 
      std::unique_lock<std::mutex> 
       lock(pool.queue_mutex); 

      // look for a work item 
      while (!pool.stop && pool.tasks.empty()) { 
       // if there are none wait for notification 
       pool.condition.wait(lock); 
      } 

      if (pool.stop) {// exit if the pool is stopped 
       return; 
      } 

      // get the task from the queue 
      task = pool.tasks.front(); 
      pool.tasks.pop_front(); 

     } // release lock 

     // execute the task 
     task(); 
    } 
} 


// the constructor just launches some amount of workers 
ThreadPool::ThreadPool(size_t threads) 
    : stop(false) 
{ 
    for (size_t i = 0;i<threads;++i) { 
     workers.push_back(std::thread(Worker(*this))); 
    } 

    //workers. 
    //tasks. 
} 

// the destructor joins all threads 
ThreadPool::~ThreadPool() 
{ 
    // stop all threads 
    stop = true; 
    condition.notify_all(); 

    // join them 
    for (size_t i = 0;i<workers.size();++i) { 
     workers[i].join(); 
    } 
} 

void ThreadPool::joinAll() { 
    // join them 
    for (size_t i = 0;i<workers.size();++i) { 
     workers[i].join(); 
    } 
} 

int ThreadPool::taskSize() { 
    return tasks.size(); 
} 

// add new work item to the pool 
template<class F> 
void ThreadPool::enqueue(F f) 
{ 
    { // acquire lock 
     std::unique_lock<std::mutex> lock(queue_mutex); 

     // add the task 
     tasks.push_back(std::function<void()>(f)); 
    } // release lock 

    // wake up one thread 
    condition.notify_one(); 
} 

然后分发我的工作线程之间是这样的:

ThreadPool pool(4); 
/* ... */ 
for (int y=0;y<N;y++) { 
    pool->enqueue([this,y] { 
     this->ProcessRow(y); 
    }); 
} 

// wait until all threads are finished 
std::this_thread::sleep_for(std::chrono::milliseconds(100)); 

等待100毫秒的作品,只是因为我知道这些工作可以完成时间少于100毫秒,但显然它不是最好的方法。一旦完成了N行处理,它需要经历另外1000代左右的相同事情。显然,我想尽快开始下一代。

我知道一定有某种方式将代码添加到我的线程池,这样我可以做这样的事情:

while (pool->isBusy()) { 
    std::this_thread::sleep_for(std::chrono::milliseconds(1)); 
} 

我一直在做这个了几晚,现在我觉得很难找到如何做到这一点的好例子。 那么,这将是对我是否履行isBusy()方法的正确方法?

+1

使用条件变量和标志。等待条件变量并在其谓词中测试该标志。 –

+0

我想问一下,如果你考虑使用像英特尔的Threading Buildings Blocks这样的东西?也许在BOOST中有很多有用的东西,而且微软也有自己的库。创建自己的线程池通常是最后的手段,以防万一你真的需要现有的线程池不能提供的东西,像英特尔的TBB这样的库也不会那样做。但在这里,我可以想象,这些任务将安排在TBB上,然后等待... – ipavlu

+0

@ipavlu,它可能使用C++ 11编写多线程代码而无需额外的库。我试图避免额外的大量依赖。我解决了这个问题,并提供了我自己的答案。 – Octopus

回答

1

您可能需要创建具有布尔变量标志相关联的线程的内部结构。

class ThreadPool { 
private: 
    // This Structure Will Keep Track Of Each Thread's Progress 
    struct ThreadInfo { 
     std::thread thread; 
     bool  isDone; 

     ThreadInfo(std::thread& threadIn) : 
      thread(threadIn), isDone(false) 
     {} 
    }; // ThredInfo 

    // This Vector Should Be Populated In The Constructor Initially And 
    // Updated Anytime You Would Add A New Task. 
    // This Should Also Replace // std::vector<std::thread> workers 
    std::vector<ThreadInfo> workers; 

public: 
    // The rest of your class would appear to be the same, but you need a 
    // way to test if a particular thread is currently active. When the 
    // thread is done this bool flag would report as being true; 

    // This will only return or report if a particular thread is done or not 
    // You would have to set this variable's flag for a particular thread to 
    // true when it completes its task, otherwise it will always be false 
    // from moment of creation. I did not add in any bounds checking to keep 
    // it simple which should be taken into consideration. 
    bool isBusy(unsigned idx) const { 
     return workers[idx].isDone; 
    } 
}; 
+0

如果将有比线程更多的作业并且线程池必须通过接收到的作业慢慢地烧掉会怎么样? – ipavlu

+0

@ipavlu我明白你在说什么;但我正在回答所问的问题。用户问什么是我实施他们的'isBusy()'方法的正确方法。用户并没有问什么是最好的解决方案。但是,由于这可能比其他方法更昂贵,因此它将bool标志与特定线程相关联。这仅表示线程是否处于活动状态。如果每个布尔旗都是真的,那么他们可以继续下一步。 –

0

如果你有N个就业机会,他们必须通过调用线程睡眠被期待已久的,那么最有效的方法是创建某处一个变量,会被一个原子操作调度作业前设置为N并且在完成计算后的每个作业中,都会有变量的原子减量。然后你可以使用原子指令来测试变量是否为零。

或锁定减量与等待句柄,当变量将递减至零。

我必须说,我不喜欢这个想法,你所要求的:

while (pool->isBusy()) { 
    std::this_thread::sleep_for(std::chrono::milliseconds(1)); 
} 

它只是不合身,也不会是1ms的几乎没有,它是利用资源等不必要的...

最好的方法是原子方式减小一些变量,并测试原子变量,如果全部完成,最后的工作会简单地基于原子测试设置WaitForSingleObject的。 以及是否必须,等待的将是WaitForSingleObject的,并会结束后醒来,次数并不多。

WaitForSingleObject

2

我知道了!

首先,我介绍了一些额外的成员ThreadPool类:

class ThreadPool { 
    /* ... exisitng code ... */ 
    /* plus the following */ 
    std::atomic<int> njobs_pending; 
    std::mutex main_mutex; 
    std::condition_variable main_condition; 
} 

现在,我可以做的比检查一些状态的时间每X量更好。现在,我可以阻止主循环,直到没有更多的就业机会正在等待:

void ThreadPool::waitUntilCompleted(unsigned n) { 
    std::unique_lock<std::mutex> lock(main_mutex); 
    main_condition.wait(lock); 
} 

只要我管理什么的等待与下面的代码簿记,在ThreadPool.enqueue()函数的开始:

njobs_pending++; 

,并在之后我运行在工任务::运算符()()函数:

if (--pool.njobs_pending == 0) { 
    pool.main_condition.notify_one(); 
} 

然后主线程可以排队任何任务是必要的,然后坐下来,等到所有的C通过以下方式完成:

for (int y=0;y<N;y++) { 
    pool->enqueue([this,y] { 
     this->ProcessRow(y); 
    }); 
} 
pool->waitUntilCompleted(); 
相关问题