2013-04-26 73 views
1

我正在尝试使TCP聊天服务器与现有的聊天客户端进行通信。我无法访问现有聊天客户端的代码,但我知道它通过网络发送的内容以及我应该发回的内容。boost :: asio :: async_read不读取后续数据包

我试图使用boost::asio库,但由于某种原因,它不想执行第二个boost::asio::async_read,我不能为我的生活弄清楚它。

我编辑boost::asio::async_read函数来添加printf("async_read\n");只是为了看看它实际上是否被调用,实际上是。

这里是我得到的输出:

Packet 1 (Header : Body Size): 
00000000 70 00           p. 

Packet 2 (Body : Auth Info): 
00000002 00 0c 59 38 3d 00 38 34 62 63 34 35 62 65 35 66 ..Y8=.84 bc45be5f 
00000012 36 33 34 34 35 66 63 33 38 32 34 32 31 61 39 39 63445fc3 82421a99 
00000022 31 66 37 66 39 62 00 31 30 35 2e 32 33 36 2e 33 1f7f9b.1 05.236.3 
00000032 35 2e 39 39 00 61 39 33 35 66 65 62 32 64 62 66 5.99.a93 5feb2dbf 
00000042 36 65 38 66 32 39 30 62 32 61 36 64 36 37 61 35 6e8f290b 2a6d67a5 
00000052 31 36 63 65 65 30 36 34 64 38 64 63 37 00 28 00 16cee064 d8dc7.(. 
00000062 00 00 81 06 01 00 77 61 63 00 03 00 09 00 00 00 ......wa c....... 

正如你所看到的,服务器端的代码只接收第一:

[INFO] Adding Server 0.0.0.0:10013 
[INFO] Starting Servers 

[ROOM] Client Joined (#1) 
async_read 
[HEADER] \x70 
[HEADER] 112 
async_read 
[ROOM] Client Left (#1) 

客户端右后对方将两包包然后在第二个档位boost::asio::async_read致电

这里是修改后的代码:

typedef std::deque<ChatMessage> ChatMessageQueue; 

class ChatParticipant 
{ 
public: 
    virtual ~ChatParticipant() { } 
    virtual void Deliver(const ChatMessage &Message) = 0; 
}; 

typedef boost::shared_ptr<ChatParticipant> ChatParticipantPtr; 

class ChatRoom 
{ 
public: 
    void join(ChatParticipantPtr Participant) 
    { 
     m_Participants.insert(Participant); 
     printf("[ROOM] Client Joined (#%i)\n", m_Participants.size()); 
     std::for_each(m_RecentMessages.begin(), m_RecentMessages.end(), 
      boost::bind(&ChatParticipant::Deliver, Participant, _1)); 
    } 

    void leave(ChatParticipantPtr Participant) 
    { 
     printf("[ROOM] Client Left (#%i)\n", m_Participants.count(Participant)); 
     m_Participants.erase(Participant); 
    } 

    void deliver(const ChatMessage &Message) 
    { 
     m_RecentMessages.push_back(Message); 
     while(m_RecentMessages.size() > MaxRecentMessages) 
      m_RecentMessages.pop_front(); 

     std::for_each(m_Participants.begin(), m_Participants.end(), 
      boost::bind(&ChatParticipant::Deliver, _1, boost::ref(Message))); 
    } 

private: 
    std::set<ChatParticipantPtr> m_Participants; 
    enum { MaxRecentMessages = 100 }; 
    ChatMessageQueue m_RecentMessages; 
}; 

class ChatSession : public ChatParticipant, public boost::enable_shared_from_this<ChatSession> 
{ 
public: 
    ChatSession(boost::asio::io_service &IOService, ChatRoom &Room) : m_Socket(IOService), m_Room(Room) 
    { 
    } 

    tcp::socket &Socket() 
    { 
     return m_Socket; 
    } 

    void Start() 
    { 
     m_Room.join(shared_from_this()); 
     boost::asio::async_read(m_Socket, 
      boost::asio::buffer(m_ReadMessage.data(), ChatMessage::HeaderLength), 
      boost::bind(&ChatSession::Handle_ReadHeader, shared_from_this(), boost::asio::placeholders::error)); 
    } 

    void Deliver(const ChatMessage &Message) 
    { 
     bool write_in_progress = !m_WriteMessages.empty(); 
     m_WriteMessages.push_back(Message); 
     if(!write_in_progress) 
     { 
      boost::asio::async_write(m_Socket, 
       boost::asio::buffer(m_WriteMessages.front().data(), m_WriteMessages.front().length()), 
       boost::bind(&ChatSession::Handle_Write, shared_from_this(), boost::asio::placeholders::error)); 
     } 
    } 

    void Handle_ReadHeader(const boost::system::error_code &Error) 
    { 
     if(Error) 
     { 
      m_Room.leave(shared_from_this()); 
      return; 
     } 

     if(m_ReadMessage.DecodeHeader()) 
     { 
      printf("[HEADER] %s\n", Strings::StringToHex(m_ReadMessage.data()).c_str()); 
      printf("[HEADER] %i\n", m_ReadMessage.length()); 
      boost::asio::async_read(m_Socket, 
       boost::asio::buffer(m_ReadMessage.body(), m_ReadMessage.length()), 
       boost::bind(&ChatSession::Handle_ReadBody, shared_from_this(), boost::asio::placeholders::error)); 
     } 
    } 

    void Handle_ReadBody(const boost::system::error_code &Error) 
    { 
     if(Error) 
     { 
      m_Room.leave(shared_from_this()); 
      return; 
     } 

     printf("[BODY] %s\n", Strings::StringToHex(m_ReadMessage.body()).c_str()); 
     //m_Room.deliver(m_ReadMessage); 
     boost::asio::async_read(m_Socket, 
      boost::asio::buffer(m_ReadMessage.data(), ChatMessage::HeaderLength), 
      boost::bind(&ChatSession::Handle_ReadHeader, shared_from_this(), boost::asio::placeholders::error)); 
    } 

    void Handle_Write(const boost::system::error_code &Error) 
    { 
     if(Error) 
     { 
      m_Room.leave(shared_from_this()); 
      return; 
     } 

     m_WriteMessages.pop_front(); 
     if(!m_WriteMessages.empty()) 
     { 
      boost::asio::async_write(m_Socket, 
       boost::asio::buffer(m_WriteMessages.front().data(), m_WriteMessages.front().length()), 
       boost::bind(&ChatSession::Handle_Write, shared_from_this(), boost::asio::placeholders::error)); 
     } 
    } 

private: 
    tcp::socket m_Socket; 
    ChatRoom &m_Room; 
    ChatMessage m_ReadMessage; 
    ChatMessageQueue m_WriteMessages; 
}; 

typedef boost::shared_ptr<ChatSession> ChatSessionPtr; 

class ChatServer 
{ 
public: 
    ChatServer(boost::asio::io_service &IOService, const tcp::endpoint &EndPoint) : m_IOService(IOService), m_Acceptor(IOService, EndPoint) 
    { 
     StartAccepting(); 
    } 

    void StartAccepting() 
    { 
     ChatSessionPtr NewSession(new ChatSession(m_IOService, m_Room)); 
     m_Acceptor.async_accept(NewSession->Socket(), 
      boost::bind(&ChatServer::Handle_Accept, this, NewSession, boost::asio::placeholders::error)); 
    } 

    void Handle_Accept(ChatSessionPtr Session, const boost::system::error_code& Error) 
    { 
     if(!Error) 
      Session->Start(); 

     StartAccepting(); 
    } 

private: 
    boost::asio::io_service &m_IOService; 
    tcp::acceptor m_Acceptor; 
    ChatRoom m_Room; 
}; 

typedef boost::shared_ptr<ChatServer> ChatServerPtr; 

int main(int argc, char **argv, char **envp) 
{ 
    boost::asio::io_service IOService; 

    tcp::endpoint EndPoint(tcp::v4(), 10013); 
    ChatServerPtr Server(new ChatServer(IOService, EndPoint)); 
    printf("[INFO] Adding Server %s:%i\n", EndPoint.address().to_string().c_str(), EndPoint.port()); 

    IOService.run(); 

    return TRUE; 
} 

你能看到我在做什么错吗?

回答

0

TCP是一种面向字节的流,在API中没有“数据包”的概念。您的第一次读取是读取112个字节(根据您的调试输出),所以您只需一次调用即可读取标题和正文。

你应该做的是先读取两个字节的长度,然后用两个字节的缓冲区调用async_read。当它返回时,你应该解码它并使用该长度来指定你想要为身体读取多少字节。这应该是您在下一次调用async_read时通过缓冲区参数传递的字节数。

+0

我确实先读取2个字节的长度。我的第一次读取得到了'\ x70',它在DecodeHeader被调用时被解码,然后转换为112,这是其余数据的长度。 – 2013-04-30 22:50:41

相关问题