2017-04-02 74 views
0

我想从我的项目的boost TCP客户端示例中创建一个客户端类,并且我注意到有时在连接到不存在的主机时handle_connect不会被调用。async_connect不会在TCP客户端类中调用处理程序

我在堆栈上看过类似的问题,在这里人们忘记运行io_service或在任何任务发布之前调用它,但我不认为这是我的情况,因为我刚刚启动io_service.run()线程调用async_connect,并成功连接,网络不可达,以及我测试过的其他一些案例工作得很好。

以下是完整的清单:

tcp_client.hpp

#ifndef TCP_CLIENT_HPP 
#define TCP_CLIENT_HPP 

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/chrono.hpp> 
#include <boost/thread/thread.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/enable_shared_from_this.hpp> 
#include <boost/make_shared.hpp> 
#include <mutex> 
#include <iostream> 
#include <iomanip> 

namespace com { 

using boost::asio::ip::tcp; 
using namespace std; 

class client : public boost::enable_shared_from_this<client> { 

private: 

    std::mutex mx_; 
    bool stopped_ = 1; 
    boost::asio::streambuf ibuf_; 
    boost::shared_ptr<boost::asio::io_service> io_service_; 
    boost::shared_ptr<boost::asio::ip::tcp::socket> sock_; 
    boost::shared_ptr<tcp::resolver::iterator> ei_; 
    std::vector<std::string> inbound_; 
    std::string host_, port_; 

public: 

    client() {} 

    void connect(std::string host, std::string port) { 
    if (!stopped_) stop(); 
    host_ = host; port_ = port; 
    io_service_.reset(new boost::asio::io_service); 
    sock_.reset(new boost::asio::ip::tcp::socket(*io_service_)); 
    ei_.reset(new tcp::resolver::iterator); 
    tcp::resolver r(*io_service_); 
    ei_ = boost::make_shared<tcp::resolver::iterator>(r.resolve(tcp::resolver::query(host_, port_))); 
    stopped_ = 0; 
    start_connect(); 
    boost::thread work(boost::bind(&client::work, shared_from_this())); 
    return; 
    } 

    bool is_running() { 
    return !stopped_; 
    } 

    void stop() { 
    stopped_ = 1; 
    sock_->close(); 
    return; 
    } 

    void send(std::string str) { 
    if (stopped_) return; 
    auto msg = boost::asio::buffer(str, str.size()); 
    boost::asio::async_write((*sock_), msg, boost::bind(&client::handle_write, shared_from_this(), _1)); 
    return; 
    } 

    std::string pull() { 
    std::lock_guard<std::mutex> lock(mx_); 
    std::string msg; 
    if (inbound_.size()>0) { 
     msg = inbound_.at(0); 
     inbound_.erase(inbound_.begin()); 
    } 
    return msg; 
    } 

    int size() { 
    std::lock_guard<std::mutex> lock(mx_); 
    return inbound_.size(); 
    } 

    void clear() { 
    std::lock_guard<std::mutex> lock(mx_); 
    inbound_.clear(); 
    return; 
    } 

private: 

    void work() { 
    if (stopped_) return; 
    std::cout<<"work in"<<std::endl; 
    io_service_->run(); 
    std::cout<<"work out"<<std::endl; 
    return; 
    } 

    void start_connect() { 
    if ((*ei_) != tcp::resolver::iterator()) { 
     std::cout<<"Trying "<<(*ei_)->endpoint()<<std::endl; 
     sock_->async_connect((*ei_)->endpoint(), boost::bind(&client::handle_connect, shared_from_this(), boost::asio::placeholders::error)); 
    } else { 
     stop(); 
    } 
    return; 
    } 

    void handle_connect(const boost::system::error_code& ec) { 
    if (stopped_) return; 

    if (!sock_->is_open()) { 
     std::cout<<"Socket closed"<<std::endl; 
     (*ei_)++; 
     start_connect(); 
    } else if (ec) { 
     std::cout<<"Connect error: "<<ec.message()<<std::endl; 
     sock_->close(); 
     (*ei_)++; 
     start_connect(); 
    } else { 
     std::cout<<"Connected to "<<(*ei_)->endpoint()<<std::endl; 
     start_read(); 
    } 

    return; 
    } 

    void start_read() { 
    if (stopped_) return; 
    boost::asio::async_read_until((*sock_), ibuf_, "", boost::bind(&client::handle_read, shared_from_this(), boost::asio::placeholders::error)); 
    return; 
    } 

    void handle_read(const boost::system::error_code& ec) { 
    std::lock_guard<std::mutex> lock(mx_); 
    if (stopped_) return; 
    if (ec) { 
     std::cout<<"Read error: "<<ec.message()<<std::endl; 
     stop(); 
     return; 
    } 

    std::string line; 
    std::istream is(&ibuf_); 
    std::getline(is, line); 
    if (!line.empty() && inbound_.size()<1000) inbound_.push_back(line); 

    start_read(); 
    return; 
    } 

private: 

    void handle_write(const boost::system::error_code& ec) { 
    if (stopped_) return; 
    if (ec) { 
     std::cout<<"Write error: "<<ec.message()<<std::endl; 
     stop(); 
     return; 
    } 
    return; 
    } 

}; 

}; 

tcp_test.cpp

#include "tcp_client.hpp" 

int main(int argc, char* argv[]) { 
    auto tcp_client = boost::shared_ptr<com::client>(new com::client); 

    try { 
    tcp_client->connect("192.168.1.15", "50000"); 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(1000)); 
    tcp_client->connect("192.168.1.20", "50000"); 
    } catch (std::exception& e) { 
    std::cerr<<"Exception: "<<e.what()<<std::endl; 
    } 

    int cnt=0; 
    while (cnt<5) { 
    std::cout<<cnt<<std::endl; 
    cnt++; 
    tcp_client->send("<test>"); 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(500)); 
    } 

    tcp_client->stop(); 

    while (tcp_client->size()>0) std::cout<<tcp_client->pull()<<std::endl; 

    return 0; 
} 

我得到的输出连接到环回服务器时:

Trying 192.168.1.15:50000 
work in 
work out 
Trying 192.168.1.20:50000 
0 
work in 
Connected to 192.168.1.20:50000 
1 
2 
3 
4 
work out 
<test> 
<test> 
<test> 
<test> 
<test> 

192.168.1.20正如你所看到的那样工作。 192.168.1.15并不存在,但我预料它会引发某种错误。相反,io_service.run()立即返回,就像async_connect永远不会发布回调任务。也许它与端点迭代器有关而不是async_connect?

任何人都可以请解释为什么会发生这样的事情?

然后我试着在此代码隔离问题:

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/chrono.hpp> 
#include <boost/thread/thread.hpp> 

boost::asio::io_service io_svc; 
boost::asio::ip::tcp::socket sock(io_svc); 
boost::asio::ip::tcp::resolver::iterator ei; 

void work() { 
    std::cout<<"work in"<<std::endl; 
    io_svc.run(); 
    std::cout<<"work out"<<std::endl; 
    return; 
} 

void stop() { 
    sock.close(); 
    return; 
} 

void start_connect(); 

void handle_connect(const boost::system::error_code& ec) { 
    if (!sock.is_open()) { 
    std::cout<<"Socket closed"<<std::endl; 
    ei++; 
    start_connect(); 
    } else if (ec) { 
    std::cout<<"Connect error: "<<ec.message()<<std::endl; 
    sock.close(); 
    ei++; 
    start_connect(); 
    } else { 
    std::cout<<"Connected to "<<ei->endpoint()<<std::endl; 
    } 
    return; 
} 

void start_connect() { 
    if (ei != boost::asio::ip::tcp::resolver::iterator()) { 
    std::cout<<"Trying "<<ei->endpoint()<<std::endl; 
    sock.async_connect(ei->endpoint(), boost::bind(handle_connect, boost::asio::placeholders::error)); 
    } else { 
    stop(); 
    } 
    return; 
} 

int main(int argc, char* argv[]) { 

    std::string host="192.168.1.15", port="50000"; 

    boost::asio::ip::tcp::resolver r(io_svc); 
    ei = r.resolve(boost::asio::ip::tcp::resolver::query(host, port)); 
    start_connect(); 
    boost::thread* thr = new boost::thread(work); 

    boost::this_thread::sleep_for(boost::chrono::milliseconds(2000)); 

    return 0; 
} 

但我有一个完全不同的结果。当我尝试连接到一个不存在的主机,大部分的时间是:

Trying 192.168.1.15:50000 
work in 

有时是:

Trying 192.168.1.15:50000 
work in 
Connect error: Operation canceled 
Connect error: Operation canceled 

,很少是:

Trying 192.168.1.15:50000 
work in 
Segmentation fault 

“制定出”永远不会打印,所以我猜这个例子中的io_service正在做一些事情,但是这与以前的代码有什么不同,以及为什么我有时只会得到“操作取消”错误?

+0

operation_canceled暗示的东西寿命临终前有一个机会,有调用的回调。另一方面,不需要客户端中的所有shared_ptrs。这足以让你的眼睛流血。客户端已由shared_ptr拥有。这足以控制所有asio对象的生命周期。 –

回答

0

在后台线程中运行的客户端应该看起来像这样。

请注意,我有注意事项包括连接超时。为此,您希望有一个与async_connect并行运行的截止日期计时器。然后,您必须正确处理交叉案例(提示:取消成功连接的截止日期计时器,并从async_wait中丢弃随后发生的错误)。

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/chrono.hpp> 
#include <thread> 
#include <functional> 

boost::asio::io_service io_svc; 

struct client 
    : std::enable_shared_from_this<client> 
{ 
    using protocol = boost::asio::ip::tcp; 
    using resolver = protocol::resolver; 
    using socket = protocol::socket; 

    using error_code = boost::system::error_code; 

    client(boost::asio::io_service& ios) 
     : ios_(ios) {} 

    void start(std::string const& host, std::string const& service) 
    { 
     auto presolver = std::make_shared<resolver>(get_io_service()); 

     presolver->async_resolve(protocol::resolver::query(host, service), 
           strand_.wrap([self = shared_from_this(), presolver](auto&& ec, auto iter) 
               { 
                self->handle_resolve(ec, presolver, iter); 
               })); 

    } 

private: 
    void 
    handle_resolve(boost::system::error_code const& ec, std::shared_ptr<resolver> presolver, resolver::iterator iter) 
    { 
     if (ec) { 
      std::cerr << "error resolving: " << ec.message() << std::endl; 
     } 
     else { 
      boost::asio::async_connect(sock, iter, strand_.wrap([self = shared_from_this(), 
                    presolver] 
                    (auto&& ec, auto iter) 
                   { 
                    self->handle_connect(ec, iter); 
                    // note - we're dropping presolver here - we don't need it any more 
                   })); 
     } 
    } 

    void handle_connect(error_code const& ec, resolver::iterator iter) 
    { 
     if (ec) { 
      std::cerr << "failed to connect: " << ec.message() << std::endl; 
     } 
     else { 
      auto payload = std::make_shared<std::string>("Hello"); 

      boost::asio::async_write(sock, boost::asio::buffer(*payload), 
            strand_.wrap([self = shared_from_this(), 
                 payload] // note! capture the payload so it continues to exist during async send 
                 (auto&& ec, auto size) 
                { 
                 self->handle_send(ec, size); 
                })); 
     } 
    } 

    void handle_send(error_code const& ec, std::size_t size) 
    { 
     if (ec) { 
      std::cerr << "send failed after " << size << " butes : " << ec.message() << std::endl; 
     } 
     else { 
      // send something else? 
     } 
    } 

    boost::asio::io_service& get_io_service() 
    { 
     return ios_; 
    } 

private: 

    boost::asio::io_service& ios_; 
    boost::asio::strand strand_{get_io_service()}; 
    socket    sock{get_io_service()}; 

}; 

void work() 
{ 
    std::cout << "work in" << std::endl; 
    io_svc.run(); 
    std::cout << "work out" << std::endl; 
    return; 
} 

int main(int argc, char *argv[]) 
{ 

    auto  pclient = std::make_shared<client>(io_svc); 
    std::string host = "192.168.1.15", port = "50000"; 
    pclient->start(host, port); 

    auto run_thread = std::thread(work); 
    if (run_thread.joinable()) 
     run_thread.join(); 

    return 0; 
} 

输出示例:

work in 
    <time passes>... 
failed to connect: Operation timed out 
work out 
+0

首先 - 感谢您的努力。我并不是想表现出忘恩负义,但我一直在寻找解释为什么io_service.run()在一个特定情况下没有做任何事情而返回,也许为什么在非类版本中,当我试图找到问题时,我有这种不稳定的行为。我不是真的希望你做我的编码,我只是想知道发生了什么,并明白我在这里错过了什么。 – thu87l

+0

@ thu87l看看我是如何管理生命和你是如何的区别。你的问题在那里 –

+0

我想我看看问题在哪里。使用resolver.async_resolve()并传递解析器以确保它的活动时间足够长,以便调用handle_resolve()。然后当查询解决时,您检查错误。 在我的课中,我使用了endpoint_iterator = resolver.resolve()。我不确定它是如何工作的(例如:[link](http://www.boost.org/doc/libs/1_45_0/doc/html/boost_asio/example/timeouts/async_tcp_client.cpp)),所以我认为它阻塞直到解决。 – thu87l