有可能与最初的方法的几个挑战:
boost::asio::io_service
不打算给从中导出或重新实现。请注意缺乏虚拟功能。
- 如果你的线程库没有提供查询线程状态的能力,那么状态信息需要单独管理。
另一种解决方法是将作业发布到io_service
,然后检查它在io_service
中的时间。如果准备运行时间与实际运行时间之间的时间增量高于某个阈值,则表示队列中的作业数多于服务队列的线程数。这样做的主要好处是动态线程池增长逻辑与其他逻辑分离。
以下是使用deadline_timer
完成此操作的示例。
- 设置
deadline_timer
从现在开始到期3
秒。
- 异步等待
deadline_timer
。从设置deadline_timer
时起,处理程序将准备好运行3
秒。
- 在异步处理程序中,检查相对于计时器何时到期的当前时间。如果它大于
2
秒,则io_service
队列正在备份,因此请将线程添加到线程池。
例子:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
class thread_pool_checker
: private boost::noncopyable
{
public:
thread_pool_checker(boost::asio::io_service& io_service,
boost::thread_group& threads,
unsigned int max_threads,
long threshold_seconds,
long periodic_seconds)
: io_service_(io_service),
timer_(io_service),
threads_(threads),
max_threads_(max_threads),
threshold_seconds_(threshold_seconds),
periodic_seconds_(periodic_seconds)
{
schedule_check();
}
private:
void schedule_check();
void on_check(const boost::system::error_code& error);
private:
boost::asio::io_service& io_service_;
boost::asio::deadline_timer timer_;
boost::thread_group& threads_;
unsigned int max_threads_;
long threshold_seconds_;
long periodic_seconds_;
};
void thread_pool_checker::schedule_check()
{
// Thread pool is already at max size.
if (max_threads_ <= threads_.size())
{
std::cout << "Thread pool has reached its max. Example will shutdown."
<< std::endl;
io_service_.stop();
return;
}
// Schedule check to see if pool needs to increase.
std::cout << "Will check if pool needs to increase in "
<< periodic_seconds_ << " seconds." << std::endl;
timer_.expires_from_now(boost::posix_time::seconds(periodic_seconds_));
timer_.async_wait(
boost::bind(&thread_pool_checker::on_check, this,
boost::asio::placeholders::error));
}
void thread_pool_checker::on_check(const boost::system::error_code& error)
{
// On error, return early.
if (error) return;
// Check how long this job was waiting in the service queue. This
// returns the expiration time relative to now. Thus, if it expired
// 7 seconds ago, then the delta time is -7 seconds.
boost::posix_time::time_duration delta = timer_.expires_from_now();
long wait_in_seconds = -delta.seconds();
// If the time delta is greater than the threshold, then the job
// remained in the service queue for too long, so increase the
// thread pool.
std::cout << "Job job sat in queue for "
<< wait_in_seconds << " seconds." << std::endl;
if (threshold_seconds_ < wait_in_seconds)
{
std::cout << "Increasing thread pool." << std::endl;
threads_.create_thread(
boost::bind(&boost::asio::io_service::run,
&io_service_));
}
// Otherwise, schedule another pool check.
run();
}
// Busy work functions.
void busy_work(boost::asio::io_service&,
unsigned int);
void add_busy_work(boost::asio::io_service& io_service,
unsigned int count)
{
io_service.post(
boost::bind(busy_work,
boost::ref(io_service),
count));
}
void busy_work(boost::asio::io_service& io_service,
unsigned int count)
{
boost::this_thread::sleep(boost::posix_time::seconds(5));
count += 1;
// When the count is 3, spawn additional busy work.
if (3 == count)
{
add_busy_work(io_service, 0);
}
add_busy_work(io_service, count);
}
int main()
{
using boost::asio::ip::tcp;
// Create io service.
boost::asio::io_service io_service;
// Add some busy work to the service.
add_busy_work(io_service, 0);
// Create thread group and thread_pool_checker.
boost::thread_group threads;
thread_pool_checker checker(io_service, threads,
3, // Max pool size.
2, // Create thread if job waits for 2 sec.
3); // Check if pool needs to grow every 3 sec.
// Start running the io service.
io_service.run();
threads.join_all();
return 0;
}
输出:
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 7 seconds.
Increasing thread pool.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 4 seconds.
Increasing thread pool.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 3 seconds.
Increasing thread pool.
Thread pool has reached its max. Example will shutdown.
如果我理解正确的话,busy_work任务可能在等待几秒钟队列以及池检查,即使最大的线程数量,也没有由于未提前创建新线程,因此尚未到达。这使得这个原则很难使用,因为动态特性不应该如此降低性能。它应该使任务执行时间更长,而仅仅需要创建新线程所需的时间,而不是静态池所需的时间。不管怎样,谢谢你。 – boqapt
@ user484936:你的理解是正确的。游泳池增长发生_已被检测到退化;它是汇聚的最简单方法之一,不应“降低绩效”。如果你想分配线程_当你知道它们是需要的,那么线程状态需要被管理,为所有线程引入开销,并且可能需要状态逻辑分散在整个代码中。如果你想分配线程,你预测它们将被需要,然后有一个专门的线程在服务中发布一个工作,然后定时等待响应。 –
我想知道在只有一个长时间运行的任务执行的情况下会发生什么,当我们的计时器触发时我们不必要地将一个线程添加到池中。如果当时实际上没有更多的事件需要处理,那么这种方法对我来说似乎是无效的。如果我错了,请纠正我。 – russoue