2013-06-28 49 views
3

我在写一个基于TCP的客户端,需要发送和接收数据。我已经使用.NET框架为Socket类提供的Asynchronous Programming Model (APM)使用TcpClient /套接字传输/发送大包安全

连接到套接字后,我开始使用BeginReceive等待套接字上的数据。

现在,当我在等待Socket上的数据时,可能需要通过套接字发送数据。和发送方法可以被调用多次,

所以我要确保

  • 所有以前Send通话字节完全发送。
  • 我发送数据的方式是安全的,因为在数据发送过程中,可以发送任何发送数据的呼叫。

这是我在socket上的第一个工作,那么我的方法是发送数据吗?

private readonly object writeLock = new object(); 
    public void Send(NetworkCommand cmd) 
    { 
     var data = cmd.ToBytesWithLengthPrefix(); 
     ThreadPool.QueueUserWorkItem(AsyncDataSent, data); 
    } 

    private int bytesSent; 
    private void AsyncDataSent(object odata) 
    { 
     lock (writeLock) 
     { 
      var data = (byte[])odata; 
      int total = data.Length; 
      bytesSent = 0; 
      int buf = Globals.BUFFER_SIZE; 
      while (bytesSent < total) 
      { 
       if (total - bytesSent < Globals.BUFFER_SIZE) 
       { 
        buf = total - bytesSent; 
       } 
       IAsyncResult ar = socket.BeginSend(data, bytesSent, buf, SocketFlags.None, DataSentCallback, data); 
       ar.AsyncWaitHandle.WaitOne(); 
      } 
     } 
    } 

如何对象变成byte[],有时NetworkCommand可大如0.5 MB

public byte[] ToBytesWithLengthPrefix() 
    { 
     var stream = new MemoryStream(); 
     try 
     { 
      Serializer.SerializeWithLengthPrefix(stream, this, PrefixStyle.Fixed32); 
      return stream.ToArray(); 
     } 
     finally 
     { 
      stream.Close(); 
      stream.Dispose(); 
     } 
    } 

完整的类

namespace Cybotech.Network 
{ 
    public delegate void ConnectedDelegate(IPEndPoint ep); 
    public delegate void DisconnectedDelegate(IPEndPoint ep); 
    public delegate void CommandReceivedDelagate(IPEndPoint ep, NetworkCommand cmd); 
} 


using System; 
using System.Net; 
using System.Net.Sockets; 
using Cybotech.Helper; 
using Cybotech.IO; 

namespace Cybotech.Network 
{ 
    public class ClientState : IDisposable 
    { 
     private int _id; 
     private int _port; 
     private IPAddress _ip; 
     private IPEndPoint _endPoint; 
     private Socket _socket; 
     private ForwardStream _stream; 
     private byte[] _buffer; 

     public ClientState(IPEndPoint endPoint, Socket socket) 
     { 
      Init(endPoint, socket); 
     } 

     private void Init(IPEndPoint endPoint, Socket socket) 
     { 
      _endPoint = endPoint; 
      _ip = _endPoint.Address; 
      _port = _endPoint.Port; 
      _id = endPoint.GetHashCode(); 
      _socket = socket; 
      _stream = new ForwardStream(); 
      _buffer = new byte[Globals.BUFFER_SIZE]; 
     } 

     public int Id 
     { 
      get { return _id; } 
     } 

     public int Port 
     { 
      get { return _port; } 
     } 

     public IPAddress Ip 
     { 
      get { return _ip; } 
     } 

     public IPEndPoint EndPoint 
     { 
      get { return _endPoint; } 
     } 

     public Socket Socket 
     { 
      get { return _socket; } 
     } 

     public ForwardStream Stream 
     { 
      get { return _stream; } 
     } 

     public byte[] Buffer 
     { 
      get { return _buffer; } 
      set { _buffer = value; } 
     } 

     protected virtual void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       if (_stream != null) 
       { 
        _stream.Close(); 
        _stream.Dispose(); 
       } 

       if (_socket != null) 
       { 
        _socket.Close(); 
       } 
      } 
     } 

     public void Dispose() 
     { 
      Dispose(true); 
     } 
    } 
} 

using System; 
using System.Collections.Generic; 
using System.Net; 
using System.Net.Sockets; 
using Cybotech.Command; 
using Cybotech.Network; 

namespace ExamServer.Network 
{ 
    public class TcpServer : IDisposable 
    { 

     private Socket socket; 
     private bool secure; 

     private readonly Dictionary<IPEndPoint, ClientState> clients = new Dictionary<IPEndPoint, ClientState>(); 

     //public events 
     #region Events 

     public event CommandDelegate CommandReceived; 
     public event ConnectedDelegate ClientAdded; 
     public event DisconnectedDelegate ClientRemoved; 

     #endregion 

     //event invokers 
     #region Event Invoke methods 

     protected virtual void OnCommandReceived(IPEndPoint ep, NetworkCommand command) 
     { 
      CommandDelegate handler = CommandReceived; 
      if (handler != null) handler(ep, command); 
     } 

     protected virtual void OnClientAdded(IPEndPoint ep) 
     { 
      ConnectedDelegate handler = ClientAdded; 
      if (handler != null) handler(ep); 
     } 

     protected virtual void OnClientDisconnect(IPEndPoint ep) 
     { 
      DisconnectedDelegate handler = ClientRemoved; 
      if (handler != null) handler(ep); 
     } 

     #endregion 

     //public property 
     public string CertificatePath { get; set; } 

     public TcpServer(EndPoint endPoint, bool secure) 
     { 
      StartServer(endPoint, secure); 
     } 

     public TcpServer(IPAddress ip, int port, bool secure) 
     { 
      StartServer(new IPEndPoint(ip, port), secure); 
     } 

     public TcpServer(string host, int port, bool secure) 
     { 
      StartServer(new IPEndPoint(IPAddress.Parse(host), port), secure); 
     } 

     private void StartServer(EndPoint ep, bool ssl) 
     { 
      socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 
      socket.Bind(ep); 
      socket.Listen(150); 
      this.secure = ssl; 

      socket.BeginAccept(AcceptClientCallback, null); 
     } 

     private void AcceptClientCallback(IAsyncResult ar) 
     { 
      Socket client = socket.EndAccept(ar); 
      var ep = (IPEndPoint) client.RemoteEndPoint; 
      var state = new ClientState(ep, client); 
      if (secure) 
      { 
       //TODO : handle client for ssl authentication 
      } 

      //add client to 
      clients.Add(ep, state); 
      OnClientAdded(ep); 
      client.BeginReceive(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, ReceiveDataCallback, state); 

      //var thread = new Thread(ReceiveDataCallback); 
      //thread.Start(state); 
     } 

     private void ReceiveDataCallback(IAsyncResult ar) 
     { 
      ClientState state = (ClientState)ar.AsyncState; 

      try 
      { 
       var bytesRead = state.Socket.EndReceive(ar); 
       state.Stream.Write(state.Buffer, 0, bytesRead); 

       // check available commands 
       while (state.Stream.LengthPrefix > 0) 
       { 
        NetworkCommand cmd = NetworkCommand.CreateFromStream(state.Stream); 
        OnCommandReceived(state.EndPoint, cmd); 
       } 

       //start reading data again 
       state.Socket.BeginReceive(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, ReceiveDataCallback, state); 
      } 
      catch (SocketException ex) 
      { 
       if (ex.NativeErrorCode.Equals(10054)) 
       { 
        RemoveClient(state.EndPoint); 
       } 
      } 
     } 

     private void RemoveClient(IPEndPoint ep) 
     { 

      OnClientDisconnect(ep); 
     } 

     protected virtual void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       //TODO : dispose all the client related socket stuff 
      } 
     } 

     public void Dispose() 
     { 
      Dispose(true); 
     } 
    } 
} 
+0

ThreadPool.QueueUserWorkItem(AsyncDataSent,data); - 发送呼叫可能会导致微妙的错误。你需要发送以FIFO的方式工作吗?即如果两个发送呼叫A和B彼此非常接近(按照该顺序),则A应当首先发生,然后B?我不认为ThreadPool.Queue保证FIFO - 所以你可能很难捕捉到bug ... – Vivek

回答

3

相同的客户端将无法发送数据给您,除非他完成发送当前字节。

所以在服务器端,你会收到完整的数据,不会被来自该客户端的其他新消息中断,但要考虑到如果发送的消息太大, ,收到完毕后最后是一条消息。

2

当你正在使用TCP,网络ocol将确保数据包以与发送相同的顺序接收。
关于线程安全性,它取决于您用于发送的实际类别。您提供的代码片段中缺少声明部分。
由名称给出您似乎使用Socket并且这是线程安全的,因此每个发送实际上是原子的,如果您使用Stream的任何风味,那么它不是线程安全的,并且您需要某种形式的同步,如锁,无论如何你目前正在使用它。
如果您要发送大数据包,那么将接收和处理部分拆分为两个不同的线程很重要。 TCP缓冲区实际上比人们想象的要小得多,不幸的是它在日志满时不会被覆盖,因为协议将继续执行重发,直到收到所有内容。

+0

我已更新完整的类使用的代码,可能现在你可以有一个更接近的看,我实际上只有发送方法怀疑,就是它正确发送数据 –

+0

好吧 - 我想你可以安全地使用一个发送使用缓冲区大小。如前所述,我会更多地研究在服务器大小上有一个读取缓冲区,因为这是更可能超载的一面。以较小的包装发送将无济于事,这完全取决于接收机尽快清洗缓冲区。当所有内容都被缓冲时,客户端发送chunck并不重要。 – weismat