2011-05-19 20 views
9

我一直在寻找有关如何处理TCP消息成帧的例子。我看到很多例子,其中NetworkStreams被传递到StreamReader或StreamWriter对象,然后使用ReadLine或WriteLine方法来处理'\ n'分隔的消息。我的应用程序协议包含以'\ n'结尾的消息,因此NetworkStream似乎是要走的路。然而,我无法找到任何具体的例子,以正确的方式处理所有这些与异步套接字的组合。在下面调用ReceiveCallback()时,如何实现NetworkStream和StreamReader类来处理消息成帧?根据我读到的内容,我可能会在一次接收中收到一封邮件的一部分,并在下一次收到邮件的其余部分(包括'\ n')。这是否意味着我可以得到一封邮件的结尾和下一封邮件的一部分?当然,必须有一个更简单的方法来处理这个问题。有关异步套接字操作和消息成帧的.NET问题

我有以下代码:

private void StartRead(Socket socket) 
    { 
     try 
     { 
      StateObject state = new StateObject(); 
      state.AsyncSocket = socket; 

      socket.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state); 
     } 
     catch (SocketException) 
     { 
      m_Socket.Shutdown(SocketShutdown.Both); 
      Disconnect(); 
     } 
    } 

    private void ReceiveCallback(IAsyncResult ar) 
    { 
     try 
     { 
      StateObject state = (StateObject)ar.AsyncState; 

      int bytes_read = state.AsyncSocket.EndReceive(ar); 

      char[] chars = new char[bytes_read + 1]; 
      System.Text.Decoder decoder = System.Text.Encoding.UTF8.GetDecoder(); 
      int charLength = decoder.GetChars(state.Buffer, 0, bytes_read, chars, 0); 

      String data = new String(chars); 

      ParseMessage(data); 

      StartRead(state.AsyncSocket); 
     } 
     catch (SocketException) 
     { 
      m_Socket.Shutdown(SocketShutdown.Both); 
      Disconnect(); 
     } 
    } 
+1

仅供参考,有作为“C#.NET”没有这样的事。该语言被命名为“C#”。 – 2011-05-19 02:31:33

+0

谢谢你让我知道! – Andrew 2011-05-19 03:32:10

回答

1

基本上你创建一个缓冲区,并在每次接收数据时,您添加数据到缓冲区,并确定是否已经收到一个或多个完整的消息。

ReceiveCallbackStartRead之间,您将不会收到任何异步消息(传入数据将自动在套接字级缓冲),因此它是检查完整消息并将其从缓冲区中删除的理想位置。

所有变化都是可能的,包括接收消息1的结尾,加上消息2,再加上消息3的开始,全部在一个块中。

我不推荐使用UTF8解码块,因为一个UTF8字符可能由两个字节组成,并且如果它们在块之间分割,数据可能会损坏。在这种情况下,您可以保留一个字节[] - 缓冲区(MemoryStream?)并将消息分割为0x0A字节。

3

使用长度前缀固定块比使用分隔符更好。您不必处理任何类型的转义,以便以这种方式使用换行符发送数据。

此答案现在可能与您无关,因为它使用的功能来自AsyncCTP,它只会在下一个.net版本中使用。不过,它确实使事情更加简洁。从本质上讲,您可以准确编写您为同步案例执行的代码,但在有异步调用的地方插入“await”语句。

public static async Task<Byte[]> ReadChunkAsync(this Stream me) { 
     var size = BitConverter.ToUInt32(await me.ReadExactAsync(4), 0); 
     checked { 
      return await me.ReadExactAsync((int)size); 
     } 
    } 

    public static async Task<Byte[]> ReadExactAsync(this Stream me, int count) { 
     var buf = new byte[count]; 
     var t = 0; 
     while (t < count) { 
      var n = await me.ReadAsync(buf, t, count - t); 
      if (n <= 0) { 
       if (t > 0) throw new IOException("End of stream (fragmented)"); 
       throw new IOException("End of stream"); 
      } 
      t += n; 
     } 
     return buf; 
    } 

    public static void WriteChunk(this Stream me, byte[] buffer, int offset, int count) { 
     me.Write(BitConverter.GetBytes(count), 0, 4); 
     me.Write(buffer, offset, count); 
    } 
+1

+1用于前缀代替分隔 – 2011-05-19 15:06:51

0

好的,这是我最终做的。我创建了一个读取器线程,基于网络流创建NetworkStream和StreamReader。然后我使用StreamReader.ReadLine来读取这些行。这是一个同步调用,但它在它自己的线程中。它似乎工作得更好。我必须实现这一点,因为这是我们的应用程序协议(换行符分隔的消息)。我知道其他人会东张西望像地狱的答案像我一样,这里是我的客户端类的相关读码:

public class Client 
{ 
    Socket    m_Socket; 

    EventWaitHandle  m_WaitHandle; 
    readonly object  m_Locker; 
    Queue<IEvent>  m_Tasks; 
    Thread    m_Thread; 

    Thread    m_ReadThread; 

    public Client() 
    { 
     m_WaitHandle = new AutoResetEvent(false); 
     m_Locker = new object(); 
     m_Tasks = new Queue<IEvent>(); 

     m_Thread = new Thread(Run); 
     m_Thread.IsBackground = true; 
     m_Thread.Start(); 
    } 

    public void EnqueueTask(IEvent task) 
    { 
     lock (m_Locker) 
     { 
      m_Tasks.Enqueue(task); 
     } 

     m_WaitHandle.Set(); 
    } 

    private void Run() 
    { 
     while (true) 
     { 
      IEvent task = null; 

      lock (m_Locker) 
      { 
       if (m_Tasks.Count > 0) 
       { 
        task = m_Tasks.Dequeue(); 

        if (task == null) 
        { 
         return; 
        } 
       } 
      } 

      if (task != null) 
      { 
       task.DoTask(this); 
      } 
      else 
      { 
       m_WaitHandle.WaitOne(); 
      } 
     } 
    } 

    public void Connect(string hostname, int port) 
    { 
     try 
     { 
      m_Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 

      IPAddress[] IPs = Dns.GetHostAddresses(hostname); 

      m_Socket.BeginConnect(IPs, port, new AsyncCallback(ConnectCallback), m_Socket); 
     } 
     catch (SocketException) 
     { 
      m_Socket.Close(); 
      OnConnect(false, "Unable to connect to server."); 
     } 
    } 

    private void ConnectCallback(IAsyncResult ar) 
    { 
     try 
     { 
      Socket socket = (Socket)ar.AsyncState; 

      socket.EndConnect(ar); 

      OnConnect(true, "Successfully connected to server."); 

      m_ReadThread = new Thread(new ThreadStart(this.ReadThread)); 
      m_ReadThread.Name = "Read Thread"; 
      m_ReadThread.IsBackground = true; 
      m_ReadThread.Start(); 
     } 
     catch (SocketException) 
     { 
      m_Socket.Close(); 
      OnConnect(false, "Unable to connect to server."); 
     } 
    } 

    void ReadThread() 
    { 
     NetworkStream networkStream = new NetworkStream(m_Socket); 
     StreamReader reader = new StreamReader(networkStream); 

     while (true) 
     { 
      try 
      { 
       String message = reader.ReadLine(); 

       // To keep the code thread-safe, enqueue a task in the CLient class thread to parse the message received. 
       EnqueueTask(new ServerMessageEvent(message)); 
      } 
      catch (IOException) 
      { 
       // The code will reach here if the server disconnects from the client. Make sure to cleanly shutdown... 
       Disconnect(); 
       break; 
      } 
     } 
    } 

    ... Code for sending/parsing the message in the Client class thread. 
}