2014-03-26 68 views
2

我试图使用async_readasync_write发送一个简单的tcp请求到一个超时的服务器。boost :: asio :: async_read在换行上返回文件错误结尾

问题是async_read尝试读取时发生错误,直到传输结束,第一个'\ n'返回错误(文件结束)。

当逐行读取字符串时(当eots-> at(last_request)='\ n')时,它会成功读取整个响应。

if(eots->at(last_request)=="") // read until end 
     { 
      boost::asio::async_read(
       socket_ 
       , input_buffer_ 
       , boost::asio::transfer_at_least(1) // read untill end or error 
       , boost::bind(&tcp_client::do_Requests_read_handle, this, boost::asio::placeholders::error) 
       ); 
     }else 
     { 
      boost::asio::async_read_until(
       socket_ 
       , input_buffer_ 
       , eots->at(last_request) // read until current request end of transmission sign/string or error 
       , boost::bind(&tcp_client::do_Requests_read_handle, this, _1) 
       ); 
     } 

这是预期的行为吗?我做对了吗?

对于测试,我试图做一个whois查询(参数whois.iana.org 43 com)。

的完整代码:

/* 
* MK async TCP 
* contains basic definitions for extractions and string filtering 
* 
*/ 
#ifndef MK_ASYNC_TCP_HPP 
#define MK_ASYNC_TCP_HPP 
// 
// async_tcp_client.cpp 
// ~~~~~~~~~~~~~~~~~~~~ 
// 
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) 
// 
// Distributed under the Boost Software License, Version 1.0. (See accompanying 
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 
// 

#include <boost/asio/deadline_timer.hpp> 
#include <boost/asio/io_service.hpp> 
#include <boost/asio/ip/tcp.hpp> 
#include <boost/asio/read_until.hpp> 
#include <boost/asio/read.hpp> 
#include <boost/asio/streambuf.hpp> 
#include <boost/asio/write.hpp> 
#include <boost/asio/placeholders.hpp> 
//#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <iostream> 

using boost::asio::deadline_timer; 
using boost::asio::ip::tcp; 

// 
// This class manages socket timeouts by applying the concept of a deadline. 
// Some asynchronous operations are given deadlines by which they must complete. 
// Deadlines are enforced by an "actor" that persists for the lifetime of the 
// tcp_client object: 
// 
// +----------------+ 
// |    | 
// | check_deadline |<---+ 
// |    | | 
// +----------------+ | async_wait() 
//    |   | 
//    +---------+ 
// 
// If the deadline actor determines that the deadline has expired, the socket 
// is closed and any outstanding operations are consequently cancelled. 
// 
// Connection establishment involves trying each endpoint in turn until a 
// connection is successful, or the available endpoints are exhausted. If the 
// deadline actor closes the socket, the connect actor is woken up and moves to 
// the next endpoint. 
// 
// +---------------+ 
// |    | 
// | start_connect |<---+ 
// |    | | 
// +---------------+ | 
//   |   | 
// async_- | +----------------+ 
// connect() | |    | 
//   +--->| handle_connect | 
//    |    | 
//    +----------------+ 
//       : 
// Once a connection is  : 
// made, the connect  : 
// actor forks in two -  : 
//       : 
// an actor for reading  :  and an actor for 
// inbound messages:  :  sending heartbeats: 
//       : 
// +------------+   :   +-------------+ 
// |   |<- - - - -+- - - - ->|    | 
// | start_read |      | start_write |<---+ 
// |   |<---+    |    | | 
// +------------+ |    +-------------+ | async_wait() 
//   |   |      |   | 
// async_- | +-------------+  async_- | +--------------+ 
// read_- | |    |  write() | |    | 
// until() +--->| handle_read |    +--->| handle_write | 
//    |    |     |    | 
//    +-------------+     +--------------+ 
// 
// The input actor reads messages from the socket, where messages are delimited 
// by the newline character. The deadline for a complete message is 30 seconds. 
// 
// The heartbeat actor sends a heartbeat (a message that consists of a single 
// newline character) every 10 seconds. In this example, no deadline is applied 
// message sending. 
// 
class tcp_client 
{ 
    public: 

     tcp_client(boost::asio::io_service& io_service, std::vector<std::string> * requests , std::vector<std::string> * responses , std::vector<std::string> * eots, unsigned int request_timeout = 30, unsigned int connect_timeout = 10) 
     : stopped_(false), 
      socket_(io_service), 
      deadline_(io_service), 
      heartbeat_timer_(io_service), 
      requests(requests), 
      responses(responses), 
      eots(eots), 
      request_timeout(request_timeout), 
      connect_timeout(connect_timeout) 
     { 
     if(eots->size()==0) 
     { 
      for(unsigned long i=0 ; i<(requests->size()-1); i++) 
      { 
       eots->push_back("\n"); 
      } 
      eots->push_back(""); 
     } 
     if(responses->size()==0) 
     { 
      responses->resize(requests->size()); 
     } 
     if((eots->size() != requests->size()) || (requests->size() != responses->size())) 
     { 
      std::cerr<<std::endl<<"wrong nr of parameters"<<std::endl; 
      return; 
     } 
     } 

     // Called by the user of the tcp_client class to initiate the connection process. 
     // The endpoint iterator will have been obtained using a tcp::resolver. 
     void start(tcp::resolver::iterator endpoint_iter) 
     { 
     // Start the connect actor. 
     start_connect(endpoint_iter); 

     // Start the deadline actor. You will note that we're not setting any 
     // particular deadline here. Instead, the connect and input actors will 
     // update the deadline prior to each asynchronous operation. 
     deadline_.async_wait(boost::bind(&tcp_client::check_deadline, this)); 
     } 

     // This function terminates all the actors to shut down the connection. It 
     // may be called by the user of the tcp_client class, or by the class itself in 
     // response to graceful termination or an unrecoverable error. 
     void stop() 
     { 
     stopped_ = true; 
     boost::system::error_code ignored_ec; 
     socket_.close(ignored_ec); 
     deadline_.cancel(); 
     heartbeat_timer_.cancel(); 
     } 

    private: 
     void start_connect(tcp::resolver::iterator endpoint_iter) 
     { 
     if (endpoint_iter != tcp::resolver::iterator()) 
     { 
      std::cout << "Trying " << endpoint_iter->endpoint() << "...\n"; 

      // Set a deadline for the connect operation. 
      deadline_.expires_from_now(boost::posix_time::seconds(60)); 

      // Start the asynchronous connect operation. 
      socket_.async_connect(endpoint_iter->endpoint(), 
       boost::bind(&tcp_client::handle_connect, 
       this, _1, endpoint_iter)); 
     } 
     else 
     { 
      // There are no more endpoints to try. Shut down the client. 
      stop(); 
     } 
     } 
     void handle_connect(const boost::system::error_code& ec, tcp::resolver::iterator endpoint_iter) 
     { 
     if (stopped_) 
      return; 

     // The async_connect() function automatically opens the socket at the start 
     // of the asynchronous operation. If the socket is closed at this time then 
     // the timeout handler must have run first. 
     if (!socket_.is_open()) 
     { 
      std::cout << "Connect timed out\n"; 

      // Try the next available endpoint. 
      start_connect(++endpoint_iter); 
     } 

     // Check if the connect operation failed before the deadline expired. 
     else if (ec) 
     { 
      std::cout << "Connect error: " << ec.message() << "\n"; 

      // We need to close the socket used in the previous connection attempt 
      // before starting a new one. 
      socket_.close(); 

      // Try the next available endpoint. 
      start_connect(++endpoint_iter); 
     } 

     // Otherwise we have successfully established a connection. 
     else 
     { 
      std::cout << "Connected to " << endpoint_iter->endpoint() << "\n"; 
      boost::asio::socket_base::keep_alive option(true); 
      socket_.set_option(option); 

      //~ // Start the input actor. 
      //~ start_read(); 

      //~ // Start the heartbeat actor. 
      //~ start_write(); 
      deadline_.expires_from_now(boost::posix_time::seconds(this->request_timeout)); 
      do_Requests_write(); 


     } 
     } 


     void handle_Requests_finish() 
     { 
     if(last_request<requests->size()) 
     { 
      last_request++; 
      do_Requests_write(); 
     }else 
     { 
      stop(); 
     } 
     } 

     void do_Requests_write() 
     { 
     if (stopped_) 
      return; 

     // Start an asynchronous operation to send a heartbeat message. 
     boost::asio::async_write(
      socket_ 
      , boost::asio::buffer(requests->at(last_request)+"\n") 
      , boost::bind(&tcp_client::do_Requests_write_handle, this, _1) 
      ); 
     } 

     void do_Requests_write_handle(const boost::system::error_code& ec) 
     { 
     if (stopped_) 
      return; 

     if (!ec) 
     { 
      do_Requests_read(); 
     } 
     else 
     { 
      std::cout << "Error do_Requests_write_handle: " << ec.message() << "\n"; 

      stop(); 
     } 
     } 

     void do_Requests_read() 
     { 
     // Set a deadline for the read operation. 
     deadline_.expires_from_now(boost::posix_time::seconds(this->request_timeout)); 

     // Start an asynchronous operation to read a newline-delimited message. 
     if(eots->at(last_request)=="") // read untill end 
     { 
      boost::asio::async_read(
       socket_ 
       , input_buffer_ 
       , boost::asio::transfer_at_least(1) // read untill end or error 
       , boost::bind(&tcp_client::do_Requests_read_handle, this, boost::asio::placeholders::error) 
       ); 
     }else 
     { 
      boost::asio::async_read_until(
       socket_ 
       , input_buffer_ 
       , eots->at(last_request) // read untill current request end of transmission sign/string or error 
       , boost::bind(&tcp_client::do_Requests_read_handle, this, _1) 
       ); 
     } 
     } 

     void do_Requests_read_handle(const boost::system::error_code& ec) 
     { 
     if (stopped_) 
      return; 

     if (!ec) 
     { 
      // Extract the newline-delimited message from the buffer. 
      //~ std::string line; 
      //~ std::istream is(&input_buffer_); 
      //~ std::getline(is, line); 
      std::istream response_istream(&input_buffer_); 
      std::string response; 
      response_istream >> response; 

      // Empty messages are heartbeats and so ignored. 
      std::cout << "Received: " << response << "\n"; 
      responses->at(last_request)+=response+"\n"; 
      //~ if (!line.empty()) 
      //~ { 
      //~ std::cout << "Received: " << line << "\n"; 
      //~ } 

      do_Requests_read(); 
     } 
     else 
     { 
      std::cout<<(std::string)"Error on receive: " + ec.message() + "\n"; 
      responses->at(last_request)+= (std::string)"Error on receive: " + ec.message() + "\n"; 
      handle_Requests_finish(); 
     } 
     } 

     void check_deadline() 
     { 
     if (stopped_) 
      return; 

     // Check whether the deadline has passed. We compare the deadline against 
     // the current time since a new asynchronous operation may have moved the 
     // deadline before this actor had a chance to run. 
     if (deadline_.expires_at() <= deadline_timer::traits_type::now()) 
     { 
      // The deadline has passed. The socket is closed so that any outstanding 
      // asynchronous operations are cancelled. 
      socket_.close(); 

      // There is no longer an active deadline. The expiry is set to positive 
      // infinity so that the actor takes no action until a new deadline is set. 
      deadline_.expires_at(boost::posix_time::pos_infin); 
     } 

     // Put the actor back to sleep. 
     deadline_.async_wait(boost::bind(&tcp_client::check_deadline, this)); 
     } 

    private: 
     bool stopped_; 
     tcp::socket socket_; 
     boost::asio::streambuf input_buffer_; 
     deadline_timer deadline_; 
     deadline_timer heartbeat_timer_; 
     std::vector<std::string> *requests, *responses, *eots; 
     unsigned int last_request=0; 
     unsigned int request_timeout = 30; 
     unsigned int connect_timeout = 10; 
}; 

int main(int argc, char* argv[]) 
{ 
    std::vector<std::string> requests, responses, eots; 
    try 
    { 
    if (argc < 4) 
    { 
     std::cerr << "Usage: tcp_client <host> <port> <query1> <query2> <query3> [..]\n"; 
     return 1; 
    } 
    for(int i = 3; i<argc ; i++) 
    { 
     requests.push_back(argv[i]); 
     eots.push_back(""); 
     responses.push_back(""); 
    } 

    boost::asio::io_service io_service; 
    tcp::resolver r(io_service); 
    tcp_client c(io_service,&requests,&responses,&eots); 

    c.start(r.resolve(tcp::resolver::query(argv[1], argv[2]))); 

    io_service.run(); 
    } 
    catch (std::exception& e) 
    { 
    std::cerr << "Exception: " << e.what() << "\n"; 
    } 

    return 0; 
} 
#endif // MK_ASYNC_TCP_HPP 

回答

5

async_read()操作用,因为文件的末尾已经达到,而不是因为第一\n已经从字字input_buffer_读取时达到boost::asio::error::eof错误代码完成。

whois.iana.org:43上对com的响应是1830字节。

$ nc whois.iana.org 43 | wc --bytesenter 
comenter 
1830

boost::asio::streambuf被提供到读操作,它将尝试分配到其中的数据可以被读出未指定的大小的缓冲区。 current implementation将尝试分配大小为512的缓冲区。因此,如果接收到1830字节,并且每个读取操作读取的最大缓冲区大小为512字节,则在第4次读取操作时将读取所有接收到的字节。因此,第5次读取操作将导致文件结束。

async_read_until()的完成条件会导致稍微不同的行为。当streambuf包含指定的分隔符或发生错误时,此操作被视为完成。当async_read_until()完成时,streambuf可能包含分隔符之外的其他数据。如果streambuf的附加数据包含分隔符,则后续对async_read_until()的调用将满足其完成条件,而无需调用AsyncReadStreamasync_read_some()函数。

+0

结合使用你的解决方案,所以... async_read()做它的工作,但我读错了(和屏幕上的输出停止后读)? –

+1

@clickstefan正确。当'async_read()'操作读入'input_buffer_'多于一个字时,完成句柄只从'input_buffer_'中提取一个单词。 –

+0

和'async_read_until()'有某种排队,因为它没有写入任何东西,直到input_buffer_'为空,因此让我认为错误是'async_read()' –

1

它要读取整个传输是在收到时,请尝试使用socket_.async_read_some和一个数组或向量更换输入缓冲区。见Short reads and writes

+0

我已经结束了与'async_read_until()' –

相关问题