2017-04-03 45 views
0

是的。我知道boost::asio这个time_out有一些问题。我的问题可能太简单了,asio家伙要在这里解决。boost:asio :: read或boost:asio :: async_read超时

我正在使用TCP协议上的boost::asio以尽可能快的速度通过网络在网络中连续读取数据。

以下函数ReadData()在while循环中从工作人员std::thread不断调用。

std::size_t ReadData(std::vector<unsigned char> & buffer, unsigned int size_to_read) { 

boost::system::error_code error_code; 
buffer.resize(size_to_read); 

// Receive body 
std::size_t bytes_read = boost::asio::read(*m_socket, boost::asio::buffer(buffer), error_code); 

if (bytes_read == 0) { 
    // log error 
    return; 
} 

return bytes_read; 
} 

它工作正常。返回数据。一切都很好。

所有我想要的,是使用TIME_OUTboost::asio::read。我了解到,我需要使用boost::asio::async_readboost::asio::async_wait以使time_out技术起作用。

其中一个boost example建议使用boost::asio::async_read_until

我应该用boost::asio::async_read还是boost::asio::async_read_until

无论我使用的是boost::asio::async_read还是boost::asio::async_read_untilboost::asio::read。但我希望在调用我的方法ReadData时完成asio::read调用&,以便客户端代码不受影响。

我该如何做到这一点?请建议

+0

你知道你可以用'插座::取消()'取消异步操作,对吧? –

+0

是的。我知道如果达到time_out,我应该取消套接字。但是,我如何在异步阅读中首先使用time_out? –

+0

@ TheQuantumPhysicist。套接字:: cancel()会在同步读操作上工作吗? –

回答

1

OK,这样的事情应该适合你的目的:

理由:

你似乎想用阻塞操作。既然如此,那么很可能你没有运行一个线程来抽取io循环。

所以我们揭开序幕插座的IO环两个同步异步任务,然后生成一个线程来:

一)复位(实际上是重新启动)的情况下,循环它已经被用尽

b)运行循环至力竭(我们可能是聪明这里只能运行,直到处理程序已经表示,一些条件已经满足,但是这是另一天的课)

#include <type_traits> 

template<class Stream, class ConstBufferSequence, class Handler> 
auto async_read_with_timeout(Stream& stream, ConstBufferSequence&& sequence, std::size_t millis, Handler&& handler) 
{ 
    using handler_type = std::decay_t<Handler>; 
    using buffer_sequence_type = std::decay_t<ConstBufferSequence>; 
    using stream_type = Stream; 

    struct state_machine : std::enable_shared_from_this<state_machine> 
    { 
     state_machine(stream_type& stream, buffer_sequence_type sequence, handler_type handler) 
       : stream_(stream) 
       , sequence_(std::move(sequence)) 
       , handler_(std::move(handler)) 
     {} 
     void start(std::size_t millis) 
     { 
      timer_.expires_from_now(boost::posix_time::milliseconds(millis)); 
      timer_.async_wait(strand_.wrap([self = this->shared_from_this()](auto&& ec) { 
       self->handle_timeout(ec); 
      })); 
      boost::asio::async_read(stream_, sequence_, 
            strand_.wrap([self = this->shared_from_this()](auto&& ec, auto size){ 
       self->handle_read(ec, size); 
      })); 
     } 

     void handle_timeout(boost::system::error_code const& ec) 
     { 
      if (not ec and not completed_) 
      { 
       boost::system::error_code sink; 
       stream_.cancel(sink); 
      } 
     } 

     void handle_read(boost::system::error_code const& ec, std::size_t size) 
     { 
      assert(not completed_); 
      boost::system::error_code sink; 
      timer_.cancel(sink); 
      completed_ = true; 
      handler_(ec, size); 
     } 

     stream_type& stream_; 
     buffer_sequence_type sequence_; 
     handler_type handler_; 
     boost::asio::io_service::strand strand_ { stream_.get_io_service() }; 
     boost::asio::deadline_timer timer_ { stream_.get_io_service() }; 
     bool completed_ = false; 
    }; 

    auto psm = std::make_shared<state_machine>(stream, 
               std::forward<ConstBufferSequence>(sequence), 
               std::forward<Handler>(handler)); 
    psm->start(millis); 
} 

std::size_t ReadData(boost::asio::ip::tcp::socket& socket, 
        std::vector<unsigned char> & buffer, 
        unsigned int size_to_read, 
        boost::system::error_code& ec) { 

    buffer.resize(size_to_read); 

    ec.clear(); 
    std::size_t bytes_read = 0; 
    auto& executor = socket.get_io_service(); 
    async_read_with_timeout(socket, boost::asio::buffer(buffer), 
          2000, // 2 seconds for example 
          [&](auto&& err, auto size){ 
     ec = err; 
     bytes_read = size; 
    }); 

    // todo: use a more scalable executor than spawning threads 
    auto future = std::async(std::launch::async, [&] { 
     if (executor.stopped()) { 
      executor.reset(); 
     } 
     executor.run(); 
    }); 
    future.wait(); 

    return bytes_read; 
} 
+0

这看起来非常接近我所需要的。是的,你是对的。我的读取方法是在一个工作线程(这是'std :: thread')循环运行。但我不熟悉股票。我必须阅读它。 –

+0

什么是'stream_type'这里?错误表示成员引用基类型'stream_type'(又名'int(int,int,int)')不是结构或联合boost :: asio :: io_service :: strand strand_ {stream_.get_io_service()}; –

+0

只能检查一次吗?答案可能有小错误吗?我可以像'stream_cancel(sink)'这样在'stream'上调用'cancel'吗?应该在套接字上调用cancel。对 ? –

相关问题