2017-05-07 38 views
0

我正在研究一个简单的TCP服务器,它读取并将消息写入线程安全队列。然后应用程序可以使用这些队列安全地读取和写入套接字,即使是从不同的线程。加速asio异步读取和使用队列写入套接字

我面临的问题是我不能async_read。我的队列有pop操作,它返回下一个要处理的元素,但是如果没有可用的元素则会阻止它。所以一旦我呼叫POP async_read回调当然不会被解雇了。有没有一种方法可以将这样的队列整合到boost asio中,还是必须完全重写?

下面是我用来展示我遇到的问题的一个简短示例。一旦建立了TCP连接,我将创建一个新的线程,该线程将在该tcp_connection下运行该应用程序。之后我想开始async_readasync_write。我一直在这里打破了我的头几个小时,我真的不知道如何解决这个问题。

class tcp_connection : public std::enable_shared_from_this<tcp_connection> 
{ 
public: 
    static std::shared_ptr<tcp_connection> create(boost::asio::io_service &io_service) { 
     return std::shared_ptr<tcp_connection>(new tcp_connection(io_service)); 
    } 

    boost::asio::ip::tcp::socket& get_socket() 
    { 
     return this->socket; 
    } 

    void app_start() 
    { 
     while(1) 
     { 
      // Pop is a blocking call. 
      auto inbound_message = this->inbound_messages.pop(); 
      std::cout << "Got message in app thread: " << inbound_message << ". Sending it back to client." << std::endl; 
      this->outbound_messages.push(inbound_message); 
     } 
    } 

    void start() { 
     this->app_thread = std::thread(&tcp_connection::app_start, shared_from_this()); 

     boost::asio::async_read_until(this->socket, this->input_stream, "\r\n", 
      strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 

     // Start async writing here. The message to send are in the outbound_message queue. But a Pop operation blocks 
     // empty() is also available to check whether the queue is empty. 
     // So how can I async write without blocking the read. 
     // block... 
     auto message = this->outbound_messages.pop(); 
     boost::asio::async_write(this->socket, boost::asio::buffer(message), 
      strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 
    } 

    void handle_read(const boost::system::error_code& e, size_t bytes_read) 
    { 
     std::cout << "handle_read called" << std::endl; 
     if (e) 
     { 
      std::cout << "Error handle_read: " << e.message() << std::endl; 
      return; 
     } 
     if (bytes_read != 0) 
     { 
      std::istream istream(&this->input_stream); 
      std::string message; 
      message.resize(bytes_read); 
      istream.read(&message[0], bytes_read); 
      std::cout << "Got message: " << message << std::endl; 
      this->inbound_messages.push(message); 
     } 
     boost::asio::async_read_until(this->socket, this->input_stream, "\r\n", 
      strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 
    } 

    void handle_write(const boost::system::error_code& e, size_t /*bytes_transferred*/) 
    { 
     if (e) 
     { 
      std::cout << "Error handle_write: " << e.message() << std::endl; 
      return; 
     } 

     // block... 
     auto message = this->outbound_messages.pop(); 
     boost::asio::async_write(this->socket, boost::asio::buffer(message), 
      strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 
    } 



private: 
    tcp_connection(boost::asio::io_service& io_service) : socket(io_service), strand(io_service) 
    { 
    } 

    boost::asio::ip::tcp::socket socket; 
    boost::asio::strand strand; 
    boost::asio::streambuf input_stream; 

    std::thread app_thread; 

    concurrent_queue<std::string> inbound_messages; 
    concurrent_queue<std::string> outbound_messages; 
}; 

class tcp_server 
{ 
public: 
    tcp_server(boost::asio::io_service& io_service) 
     : acceptor(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 9001)) 
    { 
     start_accept(); 
    } 

private: 
    void start_accept() 
    { 
     std::shared_ptr<tcp_connection> new_connection = 
      tcp_connection::create(acceptor.get_io_service()); 

     acceptor.async_accept(new_connection->get_socket(), 
      boost::bind(&tlcp_tcp_server::handle_accept, this, new_connection, boost::asio::placeholders::error)); 
    } 

    void handle_accept(std::shared_ptr<tcp_connection> new_connection, 
         const boost::system::error_code& error) 
    { 
     if (!error) 
     { 
      new_connection->start(); 
     } 

     start_accept(); 
    } 

    boost::asio::ip::tcp::acceptor acceptor; 
}; 

回答

2

,如果你想要一个async_pop方法,这需要一个错误消息占位符,并且回调处理程序在我看来。当您收到消息时,请检查是否有未完成的处理程序,如果有,请弹出消息,取消注册处理程序并调用它。同样,当注册async_pop时,如果已经有消息等待,请弹出消息并发送调用处理程序而不注册它。

您可能想从pop_operation或类似的多态基本库中派生async_pop类。

+0

谢谢!我没有想到我可以自己创建一个像这样的处理程序。就像我使用'strand.wrap'这样的lang应该可以正常工作。我有一个问题,但为什么我需要错误消息占位符? –

+0

@JohnSmith我的错误。你需要一个消息占位符(std :: string&?),你希望将来自上一次不成功的io读操作的任何错误代码传递回异步处理程序,以防消息队列耗尽并且读取错误。 –

+0

是的,我现在已经实施了。在代码中,它不是一个字符串,但用示例字符串演示是最简单的。 Tommorow我会在另一个答案中发布代码,但我会保持你的接受。只为那些想知道同样事情的人。 –