2016-08-09 95 views
1

我需要建立多达三个不同的TCP连接到不同的服务器。所有这三个连接都需要不同的协议,不同的握手和不同的心跳。学习http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/example/cpp11/chat/chat_client.cpp,阅读这里的东西,并按照克里斯Kohlhoffs建议我试图执行它如下。boost :: asio和多个客户端连接使用异步

问题是,在这种体系结构中,无论我在做什么,在调用doConnect()中的shared_from_this()时都会收到bad_weak_pointer异常。

Importent这些只是一个未运行的代码片断,它可能包含错误! Importent

我有一个包含一些基本方法的基类。

Connection.h

class Connection : public std::enable_shared_from_this<Connection> 
{ 
public: 
    //! Ctor 
    inline Connection(); 
    //! Dtor 
    inline virtual ~Connection(); 
    inline void setReconnectTime(const long &reconnectAfterMilisec) 
    { 
    m_reconnectTime = boost::posix_time::milliseconds(reconnectAfterMilisec); 
    } 
    inline void setHandshakePeriod(const long &periodInMilisec) 
    { 
    m_handshakePeriod = boost::posix_time::milliseconds(periodInMilisec); 
    } 
    virtual void doConnect() = 0; 
    virtual void stop() = 0; 
    //... and some view more... 
} 

我有一个从基类派生那么我的三个等级。这里只有一个(也是核心部分)来描述该方法。

ConnectionA.h

//queues which containing also the age of the messages 
typedef std::deque<std::pair<handshakeMsg, boost::posix_time::ptime>> handskMsg_queue; 
typedef std::deque<std::pair<errorcodeMsg, boost::posix_time::ptime>> ecMsg_queue; 
typedef std::deque<std::pair<A_Msg, boost::posix_time::ptime>> A_Msg_queue; 

class ConnectionA : public Connection 
{ 
public: 
    ConnectionA(); 
    ConnectionA(const std::string& IP, const int &port); 
    ConnectionA& operator=(const ConnectionA &other); 
    virtual ~ConnectionA(); 
    virtual void stop() override; 
    virtual void doConnect() override; 
    void doPost(std::string &message); 
    void doHandshake(); 
    void sendErrorCode(const int &ec); 

    std::shared_ptr<boost::asio::io_service>m_ioS; 

private: 
    std::shared_ptr<tcp::socket> m_socket; 
    std::shared_ptr<boost::asio::deadline_timer> m_deadlineTimer; // for reconnetions 
    std::shared_ptr<boost::asio::deadline_timer> m_handshakeTimer; // for heartbeats 

    void deadlineTimer_handler(const boost::system::error_code& error); 
    void handshakeTimer_handler(const boost::system::error_code& error); 
    void doRead(); 
    void doWrite(); 

    std::string m_IP; 
    int m_port; 
    handskMsg_queue m_handskMsgQueue; 
    ecMsg_queue m_ecMsgQueue; 
    A_Msg_queue m_AMsgQueue; 
} 

ConnectionA.cpp

ConnectionA::ConnectionA(const std::string &IP, const int &port) 
: m_ioS() 
, m_socket() 
, m_deadlineTimer() 
, m_handshakeTimer() 
, m_IP(IP) 
, m_port(port) 
, m_handskMsgQueue(10) 
, m_ecMsgQueue(10) 
, m_AMsgQueue(10) 
{ 
    m_ioS = std::make_shared<boost::asio::io_service>(); 
    m_socket = std::make_shared<tcp::socket>(*m_ioS); 
    m_deadlineTimer = std::make_shared<boost::asio::deadline_timer>(*m_ioS); 
    m_handshakeTimer = std::make_shared<boost::asio::deadline_timer> (*m_ioS); 
    m_deadlineTimer->async_wait(boost::bind(&ConnectionA::deadlineTimer_handler, this, boost::asio::placeholders::error)); 
    m_handshakeTimer->async_wait(boost::bind(&ConnectionA::handshakeTimer_handler, this, boost::asio::placeholders::error)); 
} 
ConnectionA::~ConnectionA() 
{} 

void ConnectionA::stop() 
{ 
    m_ioS->post([this]() { m_socket->close(); }); 
    m_deadlineTimer->cancel(); 
    m_handshakeTimer->cancel(); 
} 

void ConnectionA::doConnect() 
{ 
    if (m_socket->is_open()){ 
    return; 
    } 
    tcp::resolver resolver(*m_ioS); 
    std::string portAsString = std::to_string(m_port); 
    auto endpoint_iter = resolver.resolve({ m_IP.c_str(), portAsString.c_str() }); 
    m_deadlineTimer->expires_from_now(m_reconnectTime); 
    // this gives me a bad_weak_pointer exception!!! 
    auto self = std::static_pointer_cast<ConnectionA>(static_cast<ConnectionA*>(this)->shared_from_this()); 
    boost::asio::async_connect(*m_socket, endpoint_iter, [this, self](boost::system::error_code ec, tcp::resolver::iterator){ 
    if (!ec) 
    { 
     doHandshake(); 
     doRead(); 
    } 
    else { 
     // don't know if async_connect can fail but set the socket to open 
     if (m_socket->is_open()){ 
     m_socket->close(); 
     } 
    } 
    }); 
} 

void ConnectionA::doRead() 
{ 
    auto self(shared_from_this()); 
    boost::asio::async_read(*m_socket, 
    boost::asio::buffer(m_readBuf, m_readBufSize), 
    [this, self](boost::system::error_code ec, std::size_t){ 
    if(!ec){ 
     // check server answer for errors 
     } 
     doRead(); 
    } 
    else { 
     stop(); 
    } 
    }); 
} 

void ConnectionA::doPost(std::string &message) 
{ 
    A_Msg newMsg (message); 
    auto self(shared_from_this()); 
    m_ioS->post([this, self, newMsg](){ 
    bool writeInProgress = false; 
    if (!m_A_MsgQueue.empty()){ 
     writeInProgress = true; 
    } 
    boost::posix_time::ptime currentTime = time_traits_t::now(); 
    m_AMsgQueue.push_back(std::make_pair(newMsg,currentTime)); 
    if (!writeInProgress) 
    { 
     doWrite(); 
    }  
    }); 
} 

void ConnectionA::doWrite() 
{ 
    while (!m_AMsgQueue.empty()) 
    { 
    if (m_AMsgQueue.front().second + m_maxMsgAge < time_traits_t::now()){ 
      m_AMsgQueue.pop_front(); 
      continue; 
    } 
    if (!m_socket->is_open()){ 
     continue; 
    } 
    auto self(shared_from_this()); 
    boost::asio::async_write(*m_socket, 
     boost::asio::buffer(m_AMsgQueue.front().first.data(), 
     m_AMsgQueue.front().first.A_lenght), 
     [this, self](boost::system::error_code ec, std::size_t /*length*/) 
    { 
     if (!ec) // successful 
     { 
     m_handshakeTimer->expires_from_now(m_handshakePeriod); // reset timer 
     m_AMsgQueue.pop_front(); 
     doWrite(); 
     } 
     else { 
     if (m_socket->is_open()){ 
      m_socket->close(); 
     } 
     } 
    }); 
    } 
} 

void ConnectionA::deadlineTimer_handler(const boost::system::error_code& error){ 
    if (m_stopped){ 
    return; 
    } 
    m_deadlineTimer->async_wait(boost::bind(&ConnectionA::deadlineTimer_handler, this, boost::asio::placeholders::error)); 
    if (!error && !m_socket->is_open()) // timer expired and no connection was established 
    {  
    doConnect(); 
    } 
    else if (!error && m_socket->is_open()){ // timer expired and connection was established 
    m_deadlineTimer->expires_at(boost::posix_time::pos_infin); // to reactivate timer call doConnect() 
    } 
} 

最后也有其封装这些类使其更舒适的使用另一个类:

TcpConnect.h

class CTcpConnect 
{ 
public: 
    /*! Ctor 
    */ 
    CTcpConnect(); 
    //! Dtor 
    ~CTcpConnect(); 

    void initConnectionA(std::string &IP, const int &port); 
    void initConnectionB(std::string &IP, const int &port); 
    void initConnectionC(std::string &IP, const int &port); 

    void postMessageA(std::string &message); 

    void run(); 
    void stop(); 
private: 
    ConnectionA m_AConnection; 
    ConnectionB m_BConnection; 
    ConnectionC m_CConnection; 
} 

TcpConnect.cpp

CTcpConnect::CTcpConnect() 
: m_AConnection() 
, m_BConnection() 
, m_CConnection() 
{} 

CTcpConnect::~CTcpConnect() 
{} 

void CTcpConnect::run(){ 
    [this](){ m_AConnection.m_ioS->run(); }; 
    [this](){ m_BConnection.m_ioS->run(); }; 
    [this](){ m_CConnection.m_ioS->run(); }; 
} 

void CTcpConnect::stop(){ 
    m_AConnection.stop(); 
    m_BConnection.stop(); 
    m_CConnection.stop(); 
} 

void CTcpConnect::initConnectionA(std::string &IP, const int &port) 
{ 
    m_AConnection = ConnectionA(IP, port); 
    m_AConnection.setMaxMsgAge(30000); 
    //... set some view parameter more 
    m_AConnection.doConnect(); 
} 
// initConnectionB & initConnectionC are quite the same 

void CTcpConnect::postMessageA(std::string &message) 
{ 
    m_AConnection.doWrite(message); 
} 

在我尝试也只有一个io_service对象(对于我的做法,这将是罚款)​​开始,但持有该服务只是作为参考了一些头痛,因为我的实现还需要连接的默认构造函数。现在每个连接都有自己的io服务。

任何想法如何让这段代码运行? 随时为其他架构提出建议。如果你能想出这些,一些片段会更好。我已经在这个实施过程中苦苦挣扎了数周了。我很感激每一个提示。

顺便说一句我用VS12的boost 1.61。

+0

使用asio井是一门艺术。如果我告诉你,asio定时器和套接字不应该被保存在共享指针中,并且io_service引用是一个传递给对象的简单引用,它首先听起来会违反直觉,但这是方式去做吧。通过将shared_from_this()的结果传递给异步处理程序,您的对象可以保持自己和它们的缓冲区处于活动状态。当你要停止对象时,请在所有活动的io对象上调用cancel()。优秀的处理程序将执行指示错误。 –

+0

Tnx为您的建议。我知道我知道,实际上每个人都在说。但问题是,这给你一些限制。然而,阅读这些帖子http://stackoverflow.com/questions/27697973/shared-from-this-causing-bad-weak-ptr和http://stackoverflow.com/questions/29623652/using-boostasioio-service-as -class-member-field我仍然希望将我的方法运行起来。 – GregPhil

回答

1

这就是问题所在:

m_AConnection = ConnectionA(IP, port); 

也就是说,ConnectionAConnectionenable_shared_from_this派生派生。这意味着ConnectionA必须实例化为shared_from_this的共享指针才能工作。

试试这个:

void CTcpConnect::initConnectionA(std::string &IP, const int &port) 
{ 
    m_AConnection = std::make_shared<ConnectionA>(IP, port); 
    m_AConnection->setMaxMsgAge(30000); 
    //... set some view parameter more 
    m_AConnection->doConnect(); 
} 

EDIT1:

你是对的。这是问题。现在我意识到,我打电话给io-service.run()的方式完全是废话。

这是非常罕见的使用超过一个io_service,和极其罕见的使用每个连接一个:)

但是,你知道,如果我需要投然后调用shared_from_this()?我注意到asynch_connect()可以在没有演员的情况下正常工作。

许多Asio的例子使用shared_from_this()为了方便起见,我举例来说,根本没有在我的项目中使用它。在使用Asio时,您需要小心一些规则。例如,如果lambda函数捕获一个指向保存缓冲区的对象的共享指针,则在执行相应的回调之前,读写缓冲区不得被破坏,这种情况可以保持平凡。

例如,你可以做这样的事情还有:

auto data = std::make_shared<std::vector<uint8_t>>(10); 
async_read(socket, 
      boost::asio::const_buffer(*data), 
      [data](boost::system::error_code, size_t) {}); 

这将是有效的,但将有你里面std::vector每个读取被分配一个新的数据性能缺点。

另一个原因是,当你看到一些你的一些lambda表达式shared_from_this()是可以看到有用的,他们往往有以下形式:

[this, self,...](...) {...} 

也就是说,你经常要使用this在他们里面。如果您还没有捕获到self,则需要使用其他措施确保this在调用处理程序时未被销毁。

+0

你说得对。这是问题。现在我意识到我调用'io_service.run()'的方式完全是垃圾。我现在看我如何能够实现这是正确和明确的方式。然而,你知道我是否需要演员,然后调用'shared_from_this()'?我注意到''asynch_connect()'可以在没有演员的情况下正常工作。 – GregPhil

+1

是的,正如@RichardHodges在上面评论中提到的那样,使用Asio通常是一门艺术。编辑我的答案来解决你的问题。 –

+0

非常感谢您的详细解释。现在,为什么需要我的课程的shared_ptr更清晰。不过,我认为这有点误解。我是否需要像'auto self = std :: static_pointer_cast (static_cast (this) - > shared_from_this());''因为我的类'ConnectionA'派生自'Connection',它来自'enable_share_from_this '或者足够做'auto self = share_from_this()'。两者似乎都有效。 – GregPhil