2012-06-20 44 views
5

我将使用单个线程池io_serviceHTTP Server 3 example)实现boost :: asio服务器。 io_service将绑定到unix域套接字,并将从此套接字上的连接发出的请求传递给不同的线程。为了减少资源消耗,我想让线程池动态化。boost :: asio中动态线程池的示例

这是一个概念。首先创建一个单独的线程。当请求到达并且服务器发现池中没有空闲线程时,它会创建一个新线程并将请求传递给它。服务器可以创建最多一些线程数。理想情况下,它应该具有挂起线程的功能,这些线程在一段时间内处于空闲状态。

有人做出类似的东西吗?或者也许有人有一个相关的例子?

至于我,我想我应该以某种方式覆盖io_service.dispatch来实现这一点。

回答

5

有可能与最初的方法的几个挑战:

  • boost::asio::io_service不打算给从中导出或重新实现。请注意缺乏虚拟功能。
  • 如果你的线程库没有提供查询线程状态的能力,那么状态信息需要单独管理。

另一种解决方法是将作业发布到io_service,然后检查它在io_service中的时间。如果准备运行时间与实际运行时间之间的时间增量高于某个阈值,则表示队列中的作业数多于服务队列的线程数。这样做的主要好处是动态线程池增长逻辑与其他逻辑分离。

以下是使用deadline_timer完成此操作的示例。

  • 设置deadline_timer从现在开始到期3秒。
  • 异步等待deadline_timer。从设置deadline_timer时起,处理程序将准备好运行3秒。
  • 在异步处理程序中,检查相对于计时器何时到期的当前时间。如果它大于2秒,则io_service队列正在备份,因此请将线程添加到线程池。

例子:

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 

class thread_pool_checker 
    : private boost::noncopyable 
{ 
public: 

    thread_pool_checker(boost::asio::io_service& io_service, 
         boost::thread_group& threads, 
         unsigned int max_threads, 
         long threshold_seconds, 
         long periodic_seconds) 
    : io_service_(io_service), 
     timer_(io_service), 
     threads_(threads), 
     max_threads_(max_threads), 
     threshold_seconds_(threshold_seconds), 
     periodic_seconds_(periodic_seconds) 
    { 
     schedule_check(); 
    } 

private: 

    void schedule_check(); 
    void on_check(const boost::system::error_code& error); 

private: 

    boost::asio::io_service& io_service_; 
    boost::asio::deadline_timer timer_; 
    boost::thread_group&  threads_; 
    unsigned int    max_threads_; 
    long      threshold_seconds_; 
    long      periodic_seconds_; 
}; 

void thread_pool_checker::schedule_check() 
{ 
    // Thread pool is already at max size. 
    if (max_threads_ <= threads_.size()) 
    { 
    std::cout << "Thread pool has reached its max. Example will shutdown." 
       << std::endl; 
    io_service_.stop(); 
    return; 
    } 

    // Schedule check to see if pool needs to increase. 
    std::cout << "Will check if pool needs to increase in " 
      << periodic_seconds_ << " seconds." << std::endl; 
    timer_.expires_from_now(boost::posix_time::seconds(periodic_seconds_)); 
    timer_.async_wait( 
    boost::bind(&thread_pool_checker::on_check, this, 
       boost::asio::placeholders::error)); 
} 

void thread_pool_checker::on_check(const boost::system::error_code& error) 
{ 
    // On error, return early. 
    if (error) return; 

    // Check how long this job was waiting in the service queue. This 
    // returns the expiration time relative to now. Thus, if it expired 
    // 7 seconds ago, then the delta time is -7 seconds. 
    boost::posix_time::time_duration delta = timer_.expires_from_now(); 
    long wait_in_seconds = -delta.seconds(); 

    // If the time delta is greater than the threshold, then the job 
    // remained in the service queue for too long, so increase the 
    // thread pool. 
    std::cout << "Job job sat in queue for " 
      << wait_in_seconds << " seconds." << std::endl; 
    if (threshold_seconds_ < wait_in_seconds) 
    { 
    std::cout << "Increasing thread pool." << std::endl; 
    threads_.create_thread(
     boost::bind(&boost::asio::io_service::run, 
        &io_service_)); 
    } 

    // Otherwise, schedule another pool check. 
    run(); 
} 

// Busy work functions. 
void busy_work(boost::asio::io_service&, 
       unsigned int); 

void add_busy_work(boost::asio::io_service& io_service, 
        unsigned int count) 
{ 
    io_service.post(
    boost::bind(busy_work, 
       boost::ref(io_service), 
       count)); 
} 

void busy_work(boost::asio::io_service& io_service, 
       unsigned int count) 
{ 
    boost::this_thread::sleep(boost::posix_time::seconds(5)); 

    count += 1; 

    // When the count is 3, spawn additional busy work. 
    if (3 == count) 
    { 
    add_busy_work(io_service, 0); 
    } 
    add_busy_work(io_service, count); 
} 

int main() 
{ 
    using boost::asio::ip::tcp; 

    // Create io service. 
    boost::asio::io_service io_service; 

    // Add some busy work to the service. 
    add_busy_work(io_service, 0); 

    // Create thread group and thread_pool_checker. 
    boost::thread_group threads; 
    thread_pool_checker checker(io_service, threads, 
           3, // Max pool size. 
           2, // Create thread if job waits for 2 sec. 
           3); // Check if pool needs to grow every 3 sec. 

    // Start running the io service. 
    io_service.run(); 

    threads.join_all(); 

    return 0; 
} 

输出:

Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 7 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 4 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 3 seconds. 
Increasing thread pool. 
Thread pool has reached its max. Example will shutdown.
+1

如果我理解正确的话,busy_work任务可能在等待几秒钟队列以及池检查,即使最大的线程数量,也没有由于未提前创建新线程,因此尚未到达。这使得这个原则很难使用,因为动态特性不应该如此降低性能。它应该使任务执行时间更长,而仅仅需要创建新线程所需的时间,而不是静态池所需的时间。不管怎样,谢谢你。 – boqapt

+0

@ user484936:你的理解是正确的。游泳池增长发生_已被检测到退化;它是汇聚的最简单方法之一,不应“降低绩效”。如果你想分配线程_当你知道它们是需要的,那么线程状态需要被管理,为所有线程引入开销,并且可能需要状态逻辑分散在整个代码中。如果你想分配线程,你预测它们将被需要,然后有一个专门的线程在服务中发布一个工作,然后定时等待响应。 –

+0

我想知道在只有一个长时间运行的任务执行的情况下会发生什么,当我们的计时器触发时我们不必要地将一个线程添加到池中。如果当时实际上没有更多的事件需要处理,那么这种方法对我来说似乎是无效的。如果我错了,请纠正我。 – russoue