2017-04-05 109 views
0

我编写了一个异步boost :: asio TCP应用程序,它使用声明为std::vector<std::thread> mIOServicePool的线程池。这些线程异步读取和写入TCP数据到服务器。以下代码取自GUI的开始按钮事件处理程序。用未完成的完成处理程序重新启动tcp boost asio io_service

// launch multiple asio service threads to 
// handle the protocol instances - effectively 
// thread pooling the ioservice 
//mpIOService->reset(); 
for (auto i=0; i<3; i++) { 
mIOServicePool.emplace_back(
    std::thread([this]() { mpIOService->run(); })); 
} 

的代码与存储作为GUI的主窗口类的成员的mIOServicePool基于Qt的GUI应用程序的一部分。

当我启动应用程序并保持运行状态时,此工作正常,但是,在尝试重新启动与后端服务器的连接时,事情开始出错。问题很可能与未完成的处理程序有关,当我重置与io_service关联的io_service::work(当按下GUI停止按钮时),我认为该处理程序会被刷新。当我尝试在读取内存asio :: socket的流缓冲区时,通过访问冲突尝试启动TCP通信(至少在Windows上)时,问题就会显现出来。从下面的堆栈跟踪中可以看到,它正在处理与读取套接字关联的完成处理程序。

app739.exe!boost::asio::basic_streambuf<std::allocator<char> >::commit(unsigned __int64 n) Line 226 C++ 
app739.exe!boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >::operator()(const boost::system::error_code & ec, unsigned __int64 bytes_transferred, int start) Line 624 C++ 
app739.exe!boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>::operator()() Line 129 C++ 
app739.exe!boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, ...) Line 70 C++ 
app739.exe!boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > & context) Line 39 C++ 
app739.exe!boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > > * this_handler) Line 685 C++ 
app739.exe!boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > > & context) Line 39 C++ 
app739.exe!boost::asio::detail::win_iocp_socket_recv_op<boost::asio::mutable_buffers_1,boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > > >::do_complete(boost::asio::detail::win_iocp_io_service * owner, boost::asio::detail::win_iocp_operation * base, const boost::system::error_code & result_ec, unsigned __int64 bytes_transferred) Line 97 C++ 
app739.exe!boost::asio::detail::win_iocp_operation::complete(boost::asio::detail::win_iocp_io_service & owner, const boost::system::error_code & ec, unsigned __int64 bytes_transferred) Line 47 C++ 
app739.exe!boost::asio::detail::win_iocp_io_service::do_one(bool block, boost::system::error_code & ec) Line 406 C++ 
app739.exe!boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec) Line 164 C++ 
app739.exe!boost::asio::io_service::run() Line 59 C++ 
app739.exe!MainWindow::on_pushButtonStart_clicked::__l13::<lambda>() Line 943 C++ 

answer表明该问题可能与io_service.reset()做。访问冲突时的堆栈跟踪显示该线程正在处理一个asio完成处理程序。我认为通过阅读其他帖子,解决这个问题的关键是要正确地将io_service.stop()io_service.reset()排序到boost::asio::io_service对象,在停止io_service或重置标记工作对象之前重置套接字也很重要。

下面的代码显示了如何尝试停止io_service线程,同时调试代码,我确实看到所有线程完成了他们的连接,所以我不明白为什么有完成处理程序超时。

void 
MainWindow::stopSys() 
{ 
    // make sure that we have no more work keeping services alive 
    mpWork.reset(); 
    // check to see if the protocol threads were started 
    if (mVCDUProtocol) { 
     // terminate protocol thread by setting the shared mShutdown atomic flag 
     mVCDUProtocol->shutdown(); 
     // Once each thread sees the shutdown flag, it will cleanly 
     // terminate so we can call join here to wait for the entire 
     // pool to finish 
     std::for_each(mIOServicePool.begin(), mIOServicePool.end(), 
      [](std::thread& rNext) { 
       rNext.join(); 
      }); 
     mIOServicePool.clear(); 
    } 
} 

我的代码如下所示非常简单。它启动了一个异步解析 - 这是在一个lambda处理程序中处理的。从那里,它调用start_async_ops(endPointIter)执行异步connect(),并从此lambda中,代码调用VCDUProtocol::do_read(),它执行boost::asio::async_read_until()来等待来自服务器的数据。

void 
VCDUProtocol::prosimAsyncIOThreadFn() 
{ 
    static auto& gLogger = gpLogger->getLoggerRef(
     gUseSysLog ? Logger::LogDest::SysLog : 
     Logger::LogDest::EventLog); 
    try { 
     // convert the host-name/port to a usable endpoint 
     tcp::resolver resolver(*mpIOService); 
     tcp::resolver::query query(mProtocolConfig.getProsimHostName(), 
      std::to_string(mProtocolConfig.getProsimPortNum())); 
     const auto endPointIter = std::find_if(
      resolver.resolve(query), tcp::resolver::iterator(), 
      [](const tcp::endpoint& next) { 
       return next.protocol() == tcp::v4(); 
      }); 
     if (endPointIter != tcp::resolver::iterator()) { 
      mpSocket = std::make_unique<tcp::socket>(*mpIOService); 
      mpSocketTimer = std::make_unique<deadline_timer>(*mpIOService); 
      start_async_ops(endPointIter); 
     } 
    } catch (std::exception& rEx) { 
     LOG_ERROR(gLogger, gChannel) << boost::format(
      "%1%: %2%") 
      % __FUNCTION__ 
      % rEx.what(); 
    } 
} 

void 
VCDUProtocol::start_async_ops(tcp::resolver::iterator endpoint_iter) 
{ 
    // Start the connect actor. 
    do_connect(endpoint_iter); 

    // Start the deadline actor. You will note that we're not setting any 
    // particular deadline here. Instead, the connect and input actors will 
    // update the deadline prior to each asynchronous operation. 
    mpSocketTimer->async_wait(boost::bind(
     &VCDUProtocol::check_deadline, this, _1)); 
} 

void 
VCDUProtocol::do_connect(
    tcp::resolver::iterator endpoint_iter) 
{ 
    if (endpoint_iter != tcp::resolver::iterator()) { 
     // Set a deadline for the connect operation to complete. 
     mpSocketTimer->expires_from_now(boost::posix_time::seconds(5)); 
     boost::asio::async_connect(*mpSocket, endpoint_iter, 
      [this](boost::system::error_code ec, tcp::resolver::iterator) { 
       if (!mShutdownFlag && !ec) { 
        // successfully connected here - cancel the 
        // connect timer and kick off async write ops 
        mpSocketTimer->cancel(); 
        // kick off the prosim read operation 
        do_read(); 
       } 
      }); 
    } else { 
     // No more endpoints. Close the socket. 
     shutdown(); 
    } 
} 

void 
VCDUProtocol::do_read() 
{ 
    // Start or continue an asynchronous line reads. This will read at least 
    // up to a carriage return or line feed 
    async_read_until(*mpSocket, *mTLS->mSocketStreamBuf, "\r\n", 
     boost::bind(&VCDUProtocol::handle_read, this, _1)); 
} 

这是异步读取完成处理 - 这需要取消,我读的地方,简单地关闭套接字不足以作为完井处理程序将不会被调用。我应该打电话给取消;

/** 
* Asynchronous read callback. 
* 
* @param ec  [in] Boost ASIO library error code. 
*/ 
void 
VCDUProtocol::handle_read(const boost::system::error_code& ec) 
{ 
    static auto& gLogger = gpLogger->getLoggerRef(
     gUseSysLog ? Logger::LogDest::SysLog : 
     Logger::LogDest::EventLog); 
    if (!mShutdownFlag) { 
     if (!ec) { 
      // Extract the newline-delimited message from the buffer. 
      std::string line; 
      std::istream is(mTLS->mSocketStreamBuf.get()); 
      while (std::getline(is, line)) { 
       // Critical Section 
       std::lock_guard<std::mutex> lock (gMutexGuard); 
       // handle partial line reads 
       if (is.eof()) { 
        mTLS->mPartialLine = std::move(line); 
        continue; 
       } else if (!mTLS->mPartialLine.empty()) { 
        line = std::move(mTLS->mPartialLine) + line; 
       } 
       . . . 
       // update GUI 
       mpListener->handlePageUpdate(
        mProtocolConfig.getCduID(), 
        mTLS->mVCDUPage, bRefreshCDU); 
       } 
       // not really required 
       line.clear(); 
      } 
      // keep reading 
      do_read(); 
     } else { 
      LOG_ERROR(gLogger, gChannel) << boost::format(
       "CDU_%1%: handle_read - error[%2%]") 
       % mProtocolConfig.getCduID() 
       % ec.message(); 
      shutdown(); 
     } 
    } 
} 

回答

1

重新启动TCP升压ASIO io_service对象有未完成的完成处理

AFAICT这是不可能的。

该文档明确指出在run()可以再次调用之前必须调用reset()

我认为唯一可行的选择是创建基于例如自己的事件循环。 poll_one(),从而防止首先停止服务。

这是异步读取完成处理程序 - 这个需要取消,我读了一个地方,只是关闭套接字是不够的,因为完成处理程序不会被调用。

这是不正确的。取消套接字将取消在飞行中的操作,并且他们导致以ec == operation_aborted调用完成处理程序。关闭套接字可能会导致不同的错误代码,如bad_socket

+0

谢谢,但花了大部分时间后,看着这看起来像我可能已经绊倒了一个解决方案 - 它也有道理 - 基本上伎俩似乎是优雅取消,关闭和关闭套接字之前调用重置上哨兵工作对象。正如你指出的那样,会导致handle_read处理程序被唤醒并发生错误 - 在这种情况下 - 由于线程退出或应用程序请求而导致I/O操作中止,然后我可以重新开始。 – johnco3

+0

也调用重置后的工作sentinel我调用重置io_service完全关闭它(不知道如果必要,因为线程池之前加入),然后再次开始之前,我确保io_service被重置(否则它会保持停止状态并立即退出run())并使用io_service重新初始化标记。 – johnco3

相关问题