2013-03-29 20 views
0

我在写一个服务器应用程序,它将接收来自多个TCP连接的数据。我们希望能够扩展到200个连接。第一种算法,我写这个如下:等待多个TcpClient有数据可用 - WaitHandle或Thread.Sleep?

while (keepListening) 
{ 
    foreach (TcpClient client in clientList) 
    { 
     if (!client.Connected) 
     { 
      client.Close(); 
      deleteList.Add(client); 
      continue; 
     } 

     int dataAvail = client.Available; 
     if (dataAvail > 0) 
     { 
      NetworkStream netstr = client.GetStream(); 
      byte[] arry = new byte[dataAvail]; 
      netstr.Read(arry, 0, dataAvail); 
      MemoryStream ms = new MemoryStream(arry); 
      try 
      { 
       CommData data = dataDeserializer.Deserialize(ms) as CommData; 
       beaconTable.BeaconReceived(data); 
      } 
      catch 
      { } 
     } 
    } 

    foreach (TcpClient clientToDelete in deleteList) 
     clientList.Remove(clientToDelete); 
    deleteList.Clear(); 

    while (connectionListener.Pending()) 
     clientList.Add(connectionListener.AcceptTcpClient()); 

    Thread.Sleep(20); 
} 

这工作得很好,但我发现我必须添加的Thread.Sleep到循环减慢,否则它占据了整个核心,无论有多少或几个连接。我被告知Thread.Sleep通常被认为是不好的,所以我寻找了一些替代品。在一个类似的问题,这一点,我建议使用采用WaitHandles的BeginRead和BeginAccept,所以我写了一个算法做使用同样的事情,以及与此想出了:

while (keepListening) 
{ 
    int waitResult = WaitHandle.WaitAny(waitList.Select(t => t.AsyncHandle.AsyncWaitHandle).ToArray(), connectionTimeout); 

    if (waitResult == WaitHandle.WaitTimeout) 
     continue; 

    WaitObject waitObject = waitList[waitResult]; 
    Type waitType = waitObject.WaitingObject.GetType(); 

    if (waitType == typeof(TcpListener)) 
    { 
     TcpClient newClient = (waitObject.WaitingObject as TcpListener).EndAcceptTcpClient(waitObject.AsyncHandle); 
     waitList.Remove(waitObject); 

     byte[] newBuffer = new byte[bufferSize]; 
     waitList.Add(new WaitObject(newClient.GetStream().BeginRead(newBuffer, 0, bufferSize, null, null), newClient, newBuffer)); 

     if (waitList.Count < 64) 
      waitList.Add(new WaitObject(connectionListener.BeginAcceptTcpClient(null, null), connectionListener, null)); 
     else 
     { 
      connectionListener.Stop(); 
      listening = false; 
     } 
    } 
    else if (waitType == typeof(TcpClient)) 
    { 
     TcpClient currentClient = waitObject.WaitingObject as TcpClient; 
     int bytesRead = currentClient.GetStream().EndRead(waitObject.AsyncHandle); 

     if (bytesRead > 0) 
     { 
      MemoryStream ms = new MemoryStream(waitObject.DataBuffer, 0, bytesRead); 
      try 
      { 
       CommData data = dataDeserializer.Deserialize(ms) as CommData; 
       beaconTable.BeaconReceived(data); 
      } 
      catch 
      { } 
      byte[] newBuffer = new byte[bufferSize]; 
      waitList.Add(new WaitObject(currentClient.GetStream().BeginRead(newBuffer, 0, bufferSize, null, null), currentClient, newBuffer)); 
     } 
     else 
     { 
      currentClient.Close(); 
     } 

     waitList.Remove(waitObject); 

     if (!listening && waitList.Count < 64) 
     { 
      listening = true; 
      connectionListener.Start(); 
      waitList.Add(new WaitObject(connectionListener.BeginAcceptTcpClient(null, null), connectionListener, null)); 
     } 
    } 
    else 
     throw new ApplicationException("An unknown type ended up in the wait list somehow: " + waitType.ToString()); 
} 

这也工作正常,直到我打64个客户。我写了一个限制,不接受超过64个客户端,因为这是WaitAny可以接受的最大数量的WaitHandles。我看不出有什么好办法绕过这个限制,所以我基本上不能保持超过64个这样的连接。 Thread.Sleep算法适用于100多个连接。

我也不喜欢必须预先分配任意大小的接收数组,而不是在收到数据后按接收数据的确切大小分配接收数组。而且我必须给WaitAny一个超时,否则当我关闭应用程序时,如果没有连接,它会阻止线程从Join-ing运行。它通常更长更复杂。

那么为什么Thread.Sleep更糟糕的解决方案?有什么办法可以至少让WaitAny版本处理超过64个连接?有没有完全不同的处理方式,我没有看到?

+3

我有点困惑为什么你用'WaitHandles'完全用'BeginRead'。只需提供一个'AsyncCallback'。读取完成时将调用该方法。有关示例,请参阅http://stackoverflow.com/q/6023264/56778。另外,请检查相关的问题。 –

+0

好主意。我最初使用回调来解雇,因为它看起来太复杂了,但我试了一下,实际上很简单。我将自行发布代码。 – Mason

回答

0

Jim给出了使用异步回调而不是WaitHandles的明显建议。我最初认为这太复杂了,但是一旦我意识到我可以将一个引用传递给状态对象中的调用TcpListener或TcpClient,它就变得更简单了。随着线程安全性的一些改变,它已经准备好了。它可以通过超过100个连接进行测试,并且没有问题退出干净。不过,我仍然希望有一种选择,即不得不预先分配数据缓冲区。这里是任何人尝试类似的代码:

public class NetworkReceiver : IDisposable 
{ 
    private IReceiver beaconTable; 
    private XmlSerializer dataDeserializer; 
    private HashSet<TcpClient> ClientTable; 
    private TcpListener connectionListener; 
    private int bufferSize = 1000; 

    public NetworkReceiver(IReceiver inputTable) 
    { 
     beaconTable = inputTable; 
     dataDeserializer = new XmlSerializer(typeof(CommData)); 

     ClientTable = new HashSet<TcpClient>(); 
     connectionListener = new TcpListener(IPAddress.Any, SharedData.connectionPort); 
     connectionListener.Start(); 
     connectionListener.BeginAcceptTcpClient(ListenerCallback, connectionListener); 
    } 

    private void ListenerCallback(IAsyncResult callbackResult) 
    { 
     TcpListener listener = callbackResult.AsyncState as TcpListener; 
     TcpClient client; 

     try 
     { 
      client = listener.EndAcceptTcpClient(callbackResult); 

      lock (ClientTable) 
       ClientTable.Add(client); 

      ClientObject clientObj = new ClientObject() { AsyncClient = client, Buffer = new byte[bufferSize] }; 
      client.GetStream().BeginRead(clientObj.Buffer, 0, bufferSize, ClientReadCallback, clientObj); 

      listener.BeginAcceptTcpClient(ListenerCallback, listener); 
     } 
     catch (ObjectDisposedException) 
     { 
      return; 
     } 
    } 

    private void ClientReadCallback(IAsyncResult callbackResult) 
    { 
     ClientObject clientObj = callbackResult.AsyncState as ClientObject; 
     TcpClient client = clientObj.AsyncClient; 

     if (!client.Connected) 
      return; 

     try 
     { 
      int bytesRead = client.GetStream().EndRead(callbackResult); 
      if (bytesRead > 0) 
      { 
       MemoryStream ms = new MemoryStream(clientObj.Buffer, 0, bytesRead); 
       try 
       { 
        CommData data; 
        lock (dataDeserializer) 
         data = dataDeserializer.Deserialize(ms) as CommData; 
        lock (beaconTable) 
         beaconTable.BeaconReceived(data); 
       } 
       catch 
       { } 

       client.GetStream().BeginRead(clientObj.Buffer, 0, bufferSize, ClientReadCallback, clientObj); 
      } 
      else 
      { 
       client.Close(); 
       lock (ClientTable) 
        ClientTable.Remove(client); 
      } 
     } 
     catch (Exception ex) 
     { 
      if (ex.GetType() == typeof(ObjectDisposedException) || ex.GetType() == typeof(InvalidOperationException)) 
       return; 
      else 
       throw; 
     } 
    } 

    class ClientObject 
    { 
     public TcpClient AsyncClient; 
     public byte[] Buffer; 
    } 

    public void Dispose() 
    { 
     connectionListener.Stop(); 
     foreach (TcpClient client in ClientTable) 
      client.Close(); 
    } 
}