2011-05-26 95 views
2

我的想法是创建X个线程,使用KeepRunning方法运行它,该方法有无限循环调用_io_service.run()并在接收到新连接时使用_io_service发送任务async_accept处理程序中的_io_service.poll()。boost :: asio多线程异步接受阻塞读/写服务器

我运行服务器与这样的代码:

oh::msg::OHServer s("0.0.0.0", "9999", 200); 
    ConsoleStopServer = boost::bind(&oh::msg::OHServer::Stop, &s); 
    SetConsoleCtrlHandler(bConsoleHandler, TRUE); 
    s.Run(); 

但是当我接收一个连接,然后使用阻塞读取服务于它在后()方法在MsgWorker类/写,那么所有的线程都正在关闭。

我有一个像下面的代码(这是从http server(服务器)ASIO的一些混合的例子,我的):

OHServer::OHServer(const std::string& sAddress, const std::string& sPort, std::size_t tps) 
: _nThreadPoolSize(tps), _acceptor(_io_service), _sockClient(new boost::asio::ip::tcp::socket(_io_service)) 
{ 
    // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR). 
    boost::asio::ip::tcp::resolver resolver(_io_service); 
    boost::asio::ip::tcp::resolver::query query(sAddress, sPort); 
    boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query); 
    _acceptor.open(endpoint.protocol()); 
    _acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); 
    _acceptor.bind(endpoint); 
    _acceptor.listen(); 

    _acceptor.async_accept(
      *_sockClient, 
      boost::bind(
        &OHServer::AcceptConnection, 
        this, 
        boost::asio::placeholders::error 
      ) 
    ); 
} 


void OHServer::KeepRunning() 
{ 
    global_stream_lock.lock(); 
    std::cout << "[" << boost::this_thread::get_id() 
      << "] Thread Start" << std::endl; 
    global_stream_lock.unlock(); 

    while(true) 
    { 
      try 
      { 
        boost::system::error_code ec; 
        _io_service.run(ec); 
        if(ec) 
        { 
          global_stream_lock.lock(); 
          std::cout << "[" << boost::this_thread::get_id() 
            << "] Error: " << ec << std::endl; 
          global_stream_lock.unlock(); 
        } 
        break; 
      } 
      catch(std::exception & ex) 
      { 
        global_stream_lock.lock(); 
        std::cout << "[" << boost::this_thread::get_id() 
          << "] Exception: " << ex.what() << std::endl; 
        global_stream_lock.unlock(); 
      } 
    } 

    global_stream_lock.lock(); 
    std::cout << "[" << boost::this_thread::get_id() 
      << "] Thread Finish" << std::endl; 
    global_stream_lock.unlock(); 
} 

void OHServer::Run() 
{ 
    // Create a pool of threads to run all of the io_services. 

    for (std::size_t i = 0; i < _nThreadPoolSize; ++i) 
    { 
     boost::shared_ptr<boost::thread> thread(new boost::thread(
       boost::bind(&OHServer::KeepRunning, this))); 
     threads.push_back(thread); 
    } 

    cout << "Hit enter to close server" << endl; 
    cin.get(); 


} 

void OHServer::Stop() 
{ 
    boost::system::error_code ec; 
    _acceptor.close(ec); 

    _sockClient->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); 
    _sockClient->close(ec); 

    _io_service.stop(); 

    // Wait for all threads in the pool to exit. 
    for (std::size_t i = 0; i < threads.size(); ++i) 
    { 
     threads[i]->join(); 
     cout << "threads[ "<< i << "]->join();" << endl; 
    } 
} 

void OHServer::Post() 
{ 
    std::cout << "Accepted new connection." << std::endl; 
    CMsgWorker *msgWorker = new CMsgWorker(_sockClient); 
    msgWorker->Start(); 
    delete msgWorker; 
} 

void OHServer::AcceptConnection(const boost::system::error_code& e) 
{ 
    if (!e) 
    { 

     _io_service.post(boost::bind(&OHServer::Post, this)); 

     _acceptor.async_accept(
       *_sockClient, 
       boost::bind(
         &OHServer::AcceptConnection, 
         this, 
         boost::asio::placeholders::error 
       ) 
     ); 

    } 
} 

我应该怎么做线程是仍在等待一些工作从_io_service办?

感谢您的帮助!

+0

做异步接受操作,但同步读写操作有什么意义? – 2011-05-26 20:19:53

回答

1

检查出来:

// Kick off 5 threads 
    for (size_t i = 0; i < 5; ++i) { 
    boost::thread* t = threads.create_thread(boost::bind(&boost::asio::io_service::run, &io)); 
    std::cout << "Creating thread " << i << " with id " << t->get_id() << std::endl; 
    } 

见timer.cc例如此处查找有关如何做到这一点的想法:https://github.com/sean-/Boost.Examples/tree/master/asio/timer

1

最后我已经结束了与一些易于使用的版本服务器:

用法:

boost::shared_ptr<CTCPServer> _serverPtr; 

void CMyServer::Start() 
{  
    //First we must create a few threads 
    thread* t = 0; 
    for (int i = 0; i < COHConfig::_iThreads; ++i) 
    { 
     t =_threads.create_thread(bind(&io_service::run, &_io_service)); 
    } 

    //Then we create a server object 
    _serverPtr.reset(new CTCPServer(&_io_service, PORT_NUMBER));  

    //And finally run the server through io_service 
    _io_service.post(boost::bind(&CMyServer::RunServer, _serverPtr, &CMyServer::HandleMessage)); 
} 

//This is the function which is called by io_service to start our server 
void CMyServer::RunServer(CTCPServer* s, void (*HandleFunction)(shared_ptr<ip::tcp::socket>, deadline_timer*)) 
{ 
    s->Run(HandleFunction); 
} 

//And this is our connection handler 
void CMyServer::HandleMessage(shared_ptr<ip::tcp::socket> sockClient, deadline_timer* timer) 
{ 
    cout << "Handling connection from: " << sockClient->remote_endpoint().address().to_string() << ":" << sockClient->remote_endpoint().port() << endl; 


    //This is some class which gets socket in its constructor and handles the connection 

    scoped_ptr<CMyWorker> myWorker(new CMyWorker(sockClient)); 

    msgWorker->Start(); 

} 

//Thanks to this function we can stop our server 
void CMyServer::Stop() 
{ 
    _serverPtr->Stop();   
} 

的TCPServer.hpp文件:

#ifndef TCPSERVER_HPP 
#define TCPSERVER_HPP 

#if defined(_WIN32) 
    #define BOOST_THREAD_USE_LIB 
#endif 

#include <boost/asio.hpp> 
#include <boost/noncopyable.hpp> 
#include <boost/shared_ptr.hpp> 
#include <string> 
#include <vector> 

class CTCPServer: private boost::noncopyable 
{ 
private: 
    bool bKeepRunning; 

    boost::asio::io_service* _io_service; 
    std::string _sPort; 
    boost::asio::ip::tcp::acceptor _acceptor; 
    boost::shared_ptr<boost::asio::ip::tcp::socket> _sockClient; 
    boost::asio::deadline_timer _timer; 
    bool _bIPv6; 

    std::string SessionID(); 
public: 

    CTCPServer(boost::asio::io_service* ios, const std::string& sPort, bool bIPv6=false): 
     _sPort(sPort), 
     _acceptor(*ios), 
     _timer(*ios), 
     _bIPv6(bIPv6) 
    { 
     _io_service = ios; 
     bKeepRunning = false; 
    }; 
    void Run(void (*HandleFunction)(boost::shared_ptr<boost::asio::ip::tcp::socket> sock, boost::asio::deadline_timer* timer)); 
    void AsyncAccept(void (*HandleFunction)(boost::shared_ptr<boost::asio::ip::tcp::socket> , boost::asio::deadline_timer*)); 
    void AcceptHandler(const boost::system::error_code& e, void (*HandleFunction)(boost::shared_ptr<boost::asio::ip::tcp::socket>, boost::asio::deadline_timer*)); 
    void Stop(); 
    void Stop(void (*StopFunction)()); 

}; 

#endif 

的TCPServer.cpp文件:

#include "TCPServer.hpp" 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/thread/mutex.hpp> 
#include <iostream> 

using namespace std; 

string CTCPServer::SessionID() 
{ 
    ostringstream outs; 
    outs << "[" << boost::this_thread::get_id() << "] "; 
    return outs.str(); 
} 

void CTCPServer::Run(void (*HandleFunction)(boost::shared_ptr<boost::asio::ip::tcp::socket> , boost::asio::deadline_timer*)) 
{ 
    try 
    { 
     boost::asio::ip::tcp::resolver resolver(*_io_service); 
     boost::asio::ip::tcp::endpoint endpoint; 
     if(_bIPv6) 
     { 
      boost::asio::ip::tcp::resolver::query queryv6(boost::asio::ip::tcp::v6(), _sPort); 
      endpoint = *resolver.resolve(queryv6); 
     } 
     else 
     { 
      boost::asio::ip::tcp::resolver::query queryv4(boost::asio::ip::tcp::v4(), _sPort); 
      endpoint = *resolver.resolve(queryv4); 
     } 
     _acceptor.open(endpoint.protocol()); 
     _acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); 
     _acceptor.set_option(boost::asio::socket_base::enable_connection_aborted(true)); 
     _acceptor.bind(endpoint); 
     _acceptor.listen(); 
     boost::system::error_code ec; 
     bKeepRunning = true; 
     AsyncAccept(HandleFunction); 
    } 
    catch(std::exception& e) 
    { 
     if(!_bIPv6) 
      std::cerr << "Exception wile creating IPv4 TCP socket on port "<< _sPort<< ": " << e.what() << std::endl; 
     else 
      std::cerr << "Exception wile creating IPv6 TCP socket on port "<< _sPort<< ": " << e.what() << std::endl; 
    } 
} 

void CTCPServer::AsyncAccept(void (*HandleFunction)(boost::shared_ptr<boost::asio::ip::tcp::socket> , boost::asio::deadline_timer*)) 
{ 
    if(bKeepRunning) 
    { 
     try 
     { 
      _sockClient.reset(new boost::asio::ip::tcp::socket(*_io_service)); 
      cout << SessionID() << "Waiting for connection on port: " << _sPort << endl; 
      _acceptor.async_accept(*_sockClient, boost::bind(&CTCPServer::AcceptHandler, this, boost::asio::placeholders::error, HandleFunction)); 
     } 
     catch(exception& e) 
     { 
      string sWhat = e.what(); 
      cout << SessionID() << "Error while accepting connection: " << e.what() << endl; 
     } 
    } 
} 

void CTCPServer::AcceptHandler(const boost::system::error_code& e, 
          void (*HandleFunction)(boost::shared_ptr<boost::asio::ip::tcp::socket>, 
                boost::asio::deadline_timer*)) 
{ 
    if(!e) 
    { 
     try 
     { 
      (*_io_service).post(boost::bind(HandleFunction, _sockClient, &_timer)); 
      AsyncAccept(HandleFunction); 
     } 
     catch(exception& e) 
     { 
      cout << SessionID() << "Exception: " << e.what() << endl; 
     } 
    } 
} 

void CTCPServer::Stop() 
{ 
    cout << SessionID() << "STOP port " << _sPort << endl; 

    if(!bKeepRunning) 
     return; 

    bKeepRunning = false; 

    try 
    { 
     _sockClient->close(); 
    } 
    catch(exception& e) 
    { 
     cout << SessionID() << "Exception: " << e.what() << endl; 
    } 

    try 
    { 
     _acceptor.cancel(); 
    } 
    catch(exception& e) 
    { 
     cout << SessionID() << "Exception: " << e.what() << endl; 
    } 

    try 
    { 
     _acceptor.close(); 
    } 
    catch(exception& e) 
    { 
     cout << SessionID() << "Exception: " << e.what() << endl; 
    } 
} 

void CTCPServer::Stop(void (*StopFunction)()) 
{ 
    Stop(); 
    StopFunction(); 
} 

它也非常容易修改,以支持IPv6兼容。 它已经过测试,效果非常好。只需复制它并使用!

相关问题