我的想法是创建X个线程,使用KeepRunning方法运行它,该方法有无限循环调用_io_service.run()并在接收到新连接时使用_io_service发送任务async_accept处理程序中的_io_service.poll()。boost :: asio多线程异步接受阻塞读/写服务器
我运行服务器与这样的代码:
oh::msg::OHServer s("0.0.0.0", "9999", 200);
ConsoleStopServer = boost::bind(&oh::msg::OHServer::Stop, &s);
SetConsoleCtrlHandler(bConsoleHandler, TRUE);
s.Run();
但是当我接收一个连接,然后使用阻塞读取服务于它在后()方法在MsgWorker类/写,那么所有的线程都正在关闭。
我有一个像下面的代码(这是从http server(服务器)ASIO的一些混合的例子,我的):
OHServer::OHServer(const std::string& sAddress, const std::string& sPort, std::size_t tps)
: _nThreadPoolSize(tps), _acceptor(_io_service), _sockClient(new boost::asio::ip::tcp::socket(_io_service))
{
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(_io_service);
boost::asio::ip::tcp::resolver::query query(sAddress, sPort);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
_acceptor.open(endpoint.protocol());
_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
_acceptor.bind(endpoint);
_acceptor.listen();
_acceptor.async_accept(
*_sockClient,
boost::bind(
&OHServer::AcceptConnection,
this,
boost::asio::placeholders::error
)
);
}
void OHServer::KeepRunning()
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Thread Start" << std::endl;
global_stream_lock.unlock();
while(true)
{
try
{
boost::system::error_code ec;
_io_service.run(ec);
if(ec)
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Error: " << ec << std::endl;
global_stream_lock.unlock();
}
break;
}
catch(std::exception & ex)
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Exception: " << ex.what() << std::endl;
global_stream_lock.unlock();
}
}
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Thread Finish" << std::endl;
global_stream_lock.unlock();
}
void OHServer::Run()
{
// Create a pool of threads to run all of the io_services.
for (std::size_t i = 0; i < _nThreadPoolSize; ++i)
{
boost::shared_ptr<boost::thread> thread(new boost::thread(
boost::bind(&OHServer::KeepRunning, this)));
threads.push_back(thread);
}
cout << "Hit enter to close server" << endl;
cin.get();
}
void OHServer::Stop()
{
boost::system::error_code ec;
_acceptor.close(ec);
_sockClient->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
_sockClient->close(ec);
_io_service.stop();
// Wait for all threads in the pool to exit.
for (std::size_t i = 0; i < threads.size(); ++i)
{
threads[i]->join();
cout << "threads[ "<< i << "]->join();" << endl;
}
}
void OHServer::Post()
{
std::cout << "Accepted new connection." << std::endl;
CMsgWorker *msgWorker = new CMsgWorker(_sockClient);
msgWorker->Start();
delete msgWorker;
}
void OHServer::AcceptConnection(const boost::system::error_code& e)
{
if (!e)
{
_io_service.post(boost::bind(&OHServer::Post, this));
_acceptor.async_accept(
*_sockClient,
boost::bind(
&OHServer::AcceptConnection,
this,
boost::asio::placeholders::error
)
);
}
}
我应该怎么做线程是仍在等待一些工作从_io_service办?
感谢您的帮助!
做异步接受操作,但同步读写操作有什么意义? – 2011-05-26 20:19:53