2013-07-15 80 views
1

我的服务器应用程序有一个奇怪的问题。我的系统很简单:我有1个以上的设备和一个通过网络进行通信的服务器应用程序。协议具有可变长度的二进制包,但具有固定标头(包含有关当前包大小的信息)。数据包示例:EOF boost :: async_read thread_pull和boost 1.54

char pct[maxSize] = {} 
pct[0] = 0x5a //preambule 
pct[1] = 0xa5 //preambule 
pct[2] = 0x07 //packet size 
pct[3] = 0x0A //command 
... [payload] 

该协议建立在命令回答的原则基础上。

我使用boost :: ASIO用于通信 - io_service对象与拉线(4个线程)+异步读/写操作(下面的代码示例)并创建一个“查询周期” - 每个200毫秒计时器由:

  • 查询从设备中的一个值
  • 获取结果,查询第二个值
  • 获取结果,启动定时器再次

这项工作非常出色升压1.53(Debug和Release)。但后来我改用1.54(特别是在释放模式下)的魔法开始。我的服务器成功启动,连接到设备并启动“查询周期”。大约30-60秒,一切正常(我收到数据,数据是正确的),但是然后我开始在最后一个读取句柄上接收asio :: error(总是在一个地方)。错误类型:EOF。收到错误后,我必须断开设备连接。

一段时间的谷歌搜索给我关于EOF的信息表明另一侧(在我的情况下设备)启动断开程序。但是,根据设备的逻辑,它不是真的。 可能有人解释发生了什么事?可能是我需要设置一些套接字选项或定义?我看到两个可能的原因:

  • 我的方面init断开(有一些原因,我不知道)和EOF是这个行动的答案。
  • 某些套接字超时触发。

我的环境:

  • 操作系统:Windows 7/8
  • 编译器:MSVC 2012更新3

主 “查询周期” 的示例代码。改编自官方boost chat example所有代码都简化了减少空间:)

  • SocketWorker - 对于插座低水平包装
  • DeviceWorker - 类设备通信
  • ERES - 错误店面内部结构
  • ProtoCmd和ProtoAnswer - 用于原始数组命令和回答的包装(chat_message 模拟从boost chat example
  • lw_service_proto命名空间 - 预定义命令和数据包的最大大小

因此,代码示例。插座包装:

namespace b = boost; 
namespace ba = boost::asio; 

typedef b::function<void(const ProtoAnswer answ)> DataReceiverType; 

class SocketWorker 
{ 
private: 
    typedef ba::ip::tcp::socket socketType; 
    typedef std::unique_ptr<socketType> socketPtrType; 
    socketPtrType devSocket; 
    ProtoCmd  sendCmd; 
    ProtoAnswer rcvAnsw; 

    //[other definitions] 

public: 

//--------------------------------------------------------------------------- 
ERes SocketWorker::Connect(/*[connect settings]*/) 
{ 
    ERes res(LGS_RESULT_ERROR, "Connect to device - Unknow Error"); 

    using namespace boost::asio::ip; 
    boost::system::error_code sock_error; 

    //try to connect 
    devSocket->connect(tcp::endpoint(address::from_string(/*[connect settings ip]*/), /*[connect settings port]*/), sock_error); 

    if(sock_error.value() > 0) { 
     //[work with error] 
     devSocket->close(); 
    } 
    else { 
     //[res code ok] 
    } 

    return res; 
} 
//--------------------------------------------------------------------------- 
ERes SocketWorker::Disconnect() 
{ 
    if (devSocket->is_open()) 
    { 
     boost::system::error_code ec; 
     devSocket->shutdown(bi::tcp::socket::shutdown_send, ec); 
     devSocket->close(); 
    } 
    return ERes(LGS_RESULT_OK, "OK"); 
} 

//--------------------------------------------------------------------------- 
//query any cmd 
void SocketWorker::QueryCommand(const ProtoCmd cmd, DataReceiverType dataClb) 
{ 
    sendCmd = std::move(cmd); //store command 
    if (sendCmd .CommandLength() > 0) 
    { 
     ba::async_write(*devSocket.get(), ba::buffer(sendCmd.Data(), sendCmd.Length()), 
         b::bind(&SocketWorker::HandleSocketWrite, 
           this, ba::placeholders::error, dataClb)); 
    } 
    else 
    { 
     cerr << "Send command error: nothing to send" << endl; 
    } 
} 

//--------------------------------------------------------------------------- 
// boost socket handlers 
void SocketWorker::HandleSocketWrite(const b::system::error_code& error, 
                DataReceiverType dataClb) 
{ 
    if (error) 
    { 
     cerr << "Send cmd error: " << error.message() << endl; 
     //[send error to other place] 
     return; 
    } 

    //start reading header of answer (lw_service_proto::headerSize == 3 bytes) 
    ba::async_read(*devSocket.get(), 
        ba::buffer(rcvAnsw.Data(), lw_service_proto::headerSize), 
        b::bind(&SocketWorker::HandleSockReadHeader, 
          this, ba::placeholders::error, dataClb)); 
} 
//--------------------------------------------------------------------------- 
//handler for read header 
void SocketWorker::HandleSockReadHeader(const b::system::error_code& error, DataReceiverType dataClb) 
{ 
    if (error) 
    { 
     //[error working] 
     return; 
    } 

    //decode header (check preambule and get full packet size) and read answer payload 
    if (rcvAnsw.DecodeHeaderAndGetCmdSize()) 
    { 
     ba::async_read(*devSocket.get(), 
        ba::buffer(rcvAnsw.Answer(), rcvAnsw.AnswerLength()), 
        b::bind(&SocketWorker::HandleSockReadBody, 
          this, ba::placeholders::error, dataClb)); 
    } 
} 
//--------------------------------------------------------------------------- 
//handler for andwer payload 
void SocketWorker::HandleSockReadBody(const b::system::error_code& error, DataReceiverType dataClb) 
{ 
    //if no error - send anwser to 'master' 
    if (!error){ 
     if (dataClb != nullptr) 
      dataClb(rcvAnsw); 
    } 
    else{ 
     //[error process] 

     //here i got EOF in release mode 
    } 
} 

}; 

设备工人

class DeviceWorker 
{ 
private: 
    const static int LW_QUERY_TIME = 200; 
    LWDeviceSocketWorker sockWorker; 
    ba::io_service& timerIOService; 
    typedef std::shared_ptr<ba::deadline_timer> TimerPtr; 
    TimerPtr  queryTimer; 
    bool   queryCycleWorking; 

    //[other definitions] 
public: 

ERes DeviceWorker::Connect() 
{ 
    ERes intRes = sockWorker.Connect(/*[connect settings here]*/); 

    if(intRes != LGS_RESULT_OK) { 
     //[set result to error] 
    } 
    else { 
     //[set result to success] 

     //start "query cycle" 
     StartNewCycleQuery(); 
    } 

    return intRes; 
} 
//--------------------------------------------------------------------------- 
ERes DeviceWorker::Disconnect() 
{ 
    return sockWorker.Disconnect(); 
} 
//--------------------------------------------------------------------------- 
void DeviceWorker::StartNewCycleQuery() 
{ 
    queryCycleWorking = true; 
    //start timer 
    queryTimer = make_shared<ba::deadline_timer>(timerIOService, bt::milliseconds(LW_QUERY_TIME)); 
    queryTimer->async_wait(boost::bind(&DeviceWorker::HandleQueryTimer, 
             this, boost::asio::placeholders::error)); 
} 
//--------------------------------------------------------------------------- 
void DeviceWorker::StopCycleQuery() 
{ 
    //kill timer 
    if (queryTimer) 
     queryTimer->cancel(); 

    queryCycleWorking = false; 
} 
//--------------------------------------------------------------------------- 
//timer handler 
void DeviceWorker::HandleQueryTimer(const b::system::error_code& error) 
{ 
    if (!error) 
    { 
     ProtoCmd cmd;  
     //query for first value 
     cmd.EncodeCommandCore(lw_service_proto::cmdGetAlarm, 1); 
     sockWorker.QueryCommand(cmd, boost::bind(&DeviceWorker::ReceiveAlarmCycle, 
           this, _1));  
    } 
} 
//--------------------------------------------------------------------------- 
//receive first value 
void DeviceWorker::ReceiveAlarmCycle(ProtoAnswer adata) 
{ 
    //check and fix last bytes (remove \r\n from some commands) 
    adata.CheckAndFixFooter(); 

    //[working with answer] 

    if (queryCycleWorking) 
    { 
     //query for second value 
     ProtoCmd cmd; 
     cmd.EncodeCommandCore(lw_service_proto::cmdGetEnergyLevel, 1); 
     sockWorker.QueryCommand(cmd, b::bind(&DeviceWorker::ReceiveEnergyCycle, 
             this, _1)); 
    } 
} 
//--------------------------------------------------------------------------- 
//receive second value 
void DeviceWorker::ReceiveEnergyCycle(ProtoAnswer edata) 
{ 
    //check and fix last bytes (remove \r\n from some commands) 
    edata.CheckAndFixFooter(); 

    //[working with second value] 

    //start new "query cycle" 
    if (queryCycleWorking) 
     StartNewCycleQuery(); 
} 

}; 

任何想法,欢迎:)

编辑: 几个测试后,我看到anower图片:

  • 这个问题仅在boost 1.54上重现(调试和释放模式,释放 - 更多更快),以提升1.53没有更多的错误(也许我不好清理我的代码,然后重建第一时间....)
  • 与提升1.54和1个线程(而不是4)所有的工作以及

我也花一些时间与调试器和升压源,使一些结论:

  • 当我收到EOF我的数据已经完全接收。
  • 这EOF表明什么在这次行动中转移,即插座结果标志为0时(没有错误),但提高操作标志,如果EOF(传输的字节== 0)

在这一刻我强制打开提升1.53 ...

+1

我承认我没有深入研究问题的描述......但一开始,缓冲区的一生对我来说是可疑的。特别是,你发送'buffer(cmd.Data(),cmd.Length())' - 其中'cmd'是一个本地对象,即缓冲区显然不会超过async.operation。同样的,'rcvAnsw'在什么地方被定义? –

+0

@IgorR。 我的不好,很抱歉:) SocketWorker中定义了一个命令和一个答案的本地对象,因此它将保留所有的异步操作时间。 但是对于本地“cmd”是一个很好的问题。出于某种原因,我认为缓冲区使发送数据的副本。尝试保存命令本地... PS:添加本地命令来源在主帖 – ShaKeSPeaR

+1

不,缓冲区()免费函数不会复制并且不拥有底层缓冲区,它只是适应' ConstBufferSequence'(或'MutableBufferSequence')概念。 http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.buffer_invalidation –

回答

0

我有完全相同的问题,我敢肯定,这是提升的一个bug :: ASIO 1.54.0

Here的错误报告。

该解决方案实际上回到1.53,尽管在bug报告页面有1.54的补丁可用。

+0

谢谢,已经回到1.53。我希望补丁包含在1.55 :) – ShaKeSPeaR

+0

只是抬头即将到来的[boost 1.55](http://www.boost.org/users/history/version_1_55_0.html)。看起来像它找到并修复与io_service +多线程池的Windows错误:) – ShaKeSPeaR

0

如果您的应用程序可以正常工作,但调用io_service::run()的单个线程却失败并且出现四个线程,您很可能会遇到竞争情况。这种类型的问题很难诊断。一般来说,您应该确保您的devSocket至多有一个未完成的async_read()async_write()操作。您当前执行的SocketWorker::QueryCommand()无条件调用async_write()可能违反顺序假设documented这样

此操作在零或多次调用方面落实到 流的async_write_some功能,被称为一个组成 操作。该程序必须确保该流不会执行写入操作(例如async_write,流的async_write_some 函数或执行写入的任何其他组合操作),直到该操作完成为 。

该问题的classic solution是维护传出消息队列。如果先前的写入未完成,请将下一个传出消息附加到队列中。当前一次写入完成时,为队列中的下一条消息启动async_write()。当使用多线程调用io_service::run()时,您可能需要使用链作为链接的答案。

+0

感谢您的回答。我知道“在时间上对套接字进行一次异步操作”的问题,而我目前的设计则是通过手动控制查询周期来避免这种情况。 'timer end-write cmd-read cmd -....- start timer')。 但无论如何我试图用'strand'重写我的代码,但没有运气 - 问题仍然存在。 我强制暂时切换到1.53。 – ShaKeSPeaR

+0

我也尝试用'\ r \ n'参数重写代码''async_read_until'(仅用于删除我的处理程序)。但结果是一样的 - 我得到'EOF'和'byte_transferred = 0',尽管我的缓冲区已经处理了正确的数据包,也就是说它已经被传输并且字节计数超过了0 .... – ShaKeSPeaR

+0

我刚刚花了周末时间调试一个类似问题与提升1.54。我找不到任何代码错误。安装1.55,并没有再次看到这个问题(它发生在我每次运行特定的测试,运行相同的测试超过100次,现在升压1.55,并没有看到问题)。 – regu

相关问题