2012-11-27 134 views
5

我想创建一个专用线程,专门用于使用boost库(asio)从UDP套接字接收数据。这个线程应该是由从UDP套接字接收到的一些数据触发的无限循环。在我的应用程序中,我需要使用异步接收操作。boost asio udp套接字async_receive_from不会调用处理程序

如果我使用同步函数receive_from,一切都按预期工作。

但是,如果我使用async_receive_from处理程序永远不会被调用。由于我使用信号量来检测是否收到了一些数据,因此程序会锁定并且从不触发循环。

我已验证(使用网络分析器)发件人设备在UDP套接字上正确发送数据。

我在下面的代码中隔离了这个问题。

#include <boost\array.hpp> 
#include <boost\asio.hpp> 
#include <boost\thread.hpp> 
#include <boost\interprocess\sync\interprocess_semaphore.hpp> 

#include <iostream> 

typedef boost::interprocess::interprocess_semaphore Semaphore; 

using namespace boost::asio::ip; 

class ReceiveUDP 
{ 
public: 

    boost::thread* m_pThread; 

    boost::asio::io_service   m_io_service; 
    udp::endpoint     m_local_endpoint; 
    udp::endpoint     m_sender_endpoint; 

    udp::socket      m_socket; 

    size_t  m_read_bytes; 
    Semaphore m_receive_semaphore; 

    ReceiveUDP() : 
     m_socket(m_io_service), 
     m_local_endpoint(boost::asio::ip::address::from_string("192.168.0.254"), 11), 
     m_sender_endpoint(boost::asio::ip::address::from_string("192.168.0.11"), 5550), 
     m_receive_semaphore(0) 
    { 
     Start(); 
    } 

    void Start() 
    { 
     m_pThread = new boost::thread(&ReceiveUDP::_ThreadFunction, this); 
    } 

    void _HandleReceiveFrom(
     const boost::system::error_code& error, 
     size_t         received_bytes) 
    { 
     m_receive_semaphore.post(); 

     m_read_bytes = received_bytes; 
    } 

    void _ThreadFunction() 
    { 
     try 
     { 
      boost::array<char, 100> recv_buf; 

      m_socket.open(udp::v4()); 
      m_socket.bind(m_local_endpoint); 
      m_io_service.run(); 

      while (1) 
      { 
#if 1 // THIS WORKS 

       m_read_bytes = m_socket.receive_from(
        boost::asio::buffer(recv_buf), m_sender_endpoint); 

#else // THIS DOESN'T WORK 

       m_socket.async_receive_from(
        boost::asio::buffer(recv_buf), 
        m_sender_endpoint, 
        boost::bind(&ReceiveUDP::_HandleReceiveFrom, this, 
        boost::asio::placeholders::error, 
        boost::asio::placeholders::bytes_transferred)); 

       /* The program locks on this wait since _HandleReceiveFrom 
       is never called. */ 
       m_receive_semaphore.wait(); 

#endif 

       std::cout.write(recv_buf.data(), m_read_bytes); 
      } 

      m_socket.close(); 
     } 
     catch (std::exception& e) 
     { 
      std::cerr << e.what() << std::endl; 
     } 
    } 
}; 

void main() 
{ 
    ReceiveUDP receive_thread; 

    receive_thread.m_pThread->join(); 
} 

在旗语甲TIMED_WAIT是优选,然而用于调试目的我使用了一个阻塞等待如在上面的代码。

我错过了什么吗?我的错误在哪里?

回答

7

您拨打io_service.run()正在退出,因为io_service没有工作要做。代码然后进入while循环并呼叫m_socket.async_receive_from。此时io_service未运行,它从不读取数据并调用处理程序。

你需要安排工作调用io_service对象运行前的事:

即:

// Configure io service 
ReceiveUDP receiver; 

m_socket.open(udp::v4()); 
m_socket.bind(m_local_endpoint); 
m_socket.async_receive_from(
    boost::asio::buffer(recv_buf), 
    m_sender_endpoint, 
    boost::bind(&ReceiveUDP::_HandleReceiveFrom, receiver, 
    boost::asio::placeholders::error, 
    boost::asio::placeholders::bytes_transferred)); 

处理函数将执行以下操作:

// start the io service 
void HandleReceiveFrom(
    const boost::system::error_code& error, 
    size_t received_bytes) 
{ 
    m_receive_semaphore.post(); 

    // schedule the next asynchronous read 
    m_socket.async_receive_from(
     boost::asio::buffer(recv_buf), 
     m_sender_endpoint, 
     boost::bind(&ReceiveUDP::_HandleReceiveFrom, receiver, 
     boost::asio::placeholders::error, 
     boost::asio::placeholders::bytes_transferred)); 

    m_read_bytes = received_bytes; 
} 

你的线程则只需等待对于信号量:

while (1) 
{ 
    m_receive_semaphore.wait(); 
    std::cout.write(recv_buf.data(), m_read_bytes); 
} 

备注:

  1. 你真的需要这个额外的线程吗?处理程序是完全异步的,boost :: asio可用于管理线程池(请参阅:think-async
  2. 请不要使用下划线,后跟大写字母表示变量/函数名称。他们保留。
+0

非常感谢!我根据你的建议修改了代码,一切正常。 我已经在线程创建之前配置了IO服务。到io_service.run()的调用只是线程创建后: \t无效的start() \t { \t \t m_socket.open(UDP :: V4()); \t \t m_socket.bind(m_local_endpoint); \t \t StartRead(); \t \t m_pThread = new boost :: thread(&ReceiveUDP :: _ ThreadFunction,this); \t \t m_io_service.run(); \t} 其中StartRead()是对async_receive_from的调用。 再次感谢。 – arms

+0

谢谢你,我疯了。 – Alex

0

m_io_service.run()立即返回,所以没人发送完成处理程序。请注意,io_service::run是一种基于asio的应用程序的“消息循环”,只要您希望asio功能可用,它就应该运行(这是一个简单的描述,但对于您的情况已经足够了)。

此外,您不应该在循环中调用async.operation。相反,在前一个完成处理程序中发出后续的async.operation,以确保2个async.reads不会同时运行。

请参阅asio示例以查看典型的asio应用程序设计。

相关问题