2011-03-28 104 views
1

我正在实现一个小型库,以便使用System.Net.Sockets.Socket更容易。它应该处理任意数量的监听和接收TCP套接字,并且实现速度很快很重要。确保线程调用Socket.XXXAsync保持活动状态以完成IO请求(IO完成端口,C#)

我正在使用XXXAsync方法和CLR ThreadPool回调库用户的委托(例如,每当成功发送消息或收到某些数据时)。最好的库本身不会启动任何线程。

库的用户可以访问的接口到我Sockets包装要发送消息或开始接收(许多其它方法和过载中)消息:

public interface IClientSocket { 
    // will eventually call Socket.SendAsync 
    IMessageHandle SendAsync(byte[] buffer, int offset, int length); 

    // will eventually call Socket.RecieveAsync 
    void StartReceiveAsync(); 
} 

XXXAsync该方法使用IO完成端口。因此,调用这些方法的线程必须保持活动状态,直到操作完成,否则操作失败,并且SocketError.OperationAborted(我认为是这种情况,或者不是?)。 对库的用户施加这样的限制是丑陋且容易出错的。

这里最好的选择是什么?

  • 调用ThreadPool.QueueUserWorkItem与委托调用XXXAsync方法?那安全吗?我在某处读过,ThreadPool不会停止具有任何IOCP的空闲线程。这将是很好的,因为它解决了上述问题。
    但是,对于很多TCP连接也可能不好。 在这种情况下,每个ThreadPool线程可能会调用其中一个挂起的ReceiveAsync调用。因此,即使当前工作负载很低并且许多线程空闲(并浪费了内存),ThreadPool也不会收缩。

  • 启动一个始终活动的专用线程并调用XXXAsync方法。例如,当库用户想要发送数据时,它将一个委托放入一个同步队列中,该线程将其弹出并调用SendAsync方法。
    我不太喜欢这个解决方案,因为它浪费了一个线程,并且在多核心机器上,发送只能由一个线程执行。

此外,两种解决方案都没有缝合最好,因为他们通过作业调用异步方法到另一个线程。这可以避免吗?

您认为如何? (谢谢!!)

托马斯

编辑1:

我能重现SocketError.OperationAborted问题,下面的测试程序(我认为这是正确的)。
编译,启动并telnet到端口127.0.0.1:10000。发送“t”或“T”并等待> 3秒钟。发送“T”时,ReceiveAsync调用在ThreadPool(工作)中完成,“t”开始一个新的线程,在3秒钟后终止(失败)。

using System; 
using System.Net; 
using System.Net.Sockets; 
using System.Text; 
using System.Threading; 
using System.Collections.Generic; 

namespace Test { 
    class Program { 
     static List<Socket> _referenceToSockets = new List<Socket>(); 
     static void Main(string[] args) { 
      Thread.CurrentThread.Name = "Main Thread"; 

      // Create a listening socket on Port 10000. 
      Socket ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 
      _referenceToSockets.Add(ServerSocket); 
      var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 10000); 
      ServerSocket.Bind(endPoint); 
      ServerSocket.Listen(50); 

      // Start listening. 
      var saeaAccept = new SocketAsyncEventArgs(); 
      saeaAccept.Completed += OnCompleted; 
      ServerSocket.AcceptAsync(saeaAccept); 

      Console.WriteLine(String.Format("Listening on {0}.", endPoint)); 
      Console.ReadLine(); 
     } 

     private static void OnCompleted(object obj, SocketAsyncEventArgs evt) { 
      var socket = (Socket)obj; 
      Console.WriteLine(String.Format("Async operation completed: {0}; Error: {1}; Callback-Thread: \"{2}\" ({3} threadpool)", evt.LastOperation, evt.SocketError, Thread.CurrentThread.Name, Thread.CurrentThread.IsThreadPoolThread?"is":"no")); 
      switch (evt.LastOperation) { 
       case SocketAsyncOperation.Accept: 
        // Client connected. Listen for more. 
        Socket clientSocket = evt.AcceptSocket; 
        _referenceToSockets.Add(clientSocket); 
        evt.AcceptSocket = null; 
        socket.AcceptAsync(evt); 

        // Start receiving data. 
        var saeaReceive = new SocketAsyncEventArgs(); 
        saeaReceive.Completed += OnCompleted; 
        saeaReceive.SetBuffer(new byte[1024], 0, 1024); 
        clientSocket.ReceiveAsync(saeaReceive); 
        break; 
       case SocketAsyncOperation.Disconnect: 
        socket.Close(); 
        evt.Dispose(); 
        break; 
       case SocketAsyncOperation.Receive: 
        if (evt.SocketError != SocketError.Success) { 
         socket.DisconnectAsync(evt); 
         return; 
        } 
        var asText = Encoding.ASCII.GetString(evt.Buffer, evt.Offset, evt.BytesTransferred); 
        Console.WriteLine(String.Format("Received: {0} bytes: \"{1}\"", evt.BytesTransferred, asText)); 
        if (evt.BytesTransferred == 0) { 
         socket.Close(); 
         evt.Dispose(); 
        } 
        if (asText.ToUpper().StartsWith("T")) { 
         Action<object> action = (object o) => { 
          socket.ReceiveAsync(evt); 
          Console.WriteLine(String.Format("Called ReceiveAsync {0}...", o)); 
          Thread.Sleep(3000); 
          Console.WriteLine("End of Action..."); 
         }; 
         if (asText.StartsWith("T")) { 
          ThreadPool.QueueUserWorkItem(o=>action(o), "in ThreadPool"); 
         } else { 
          new Thread(o=>action(o)).Start("in a new Thread"); 
         } 
        } else { 
         socket.ReceiveAsync(evt); 
        } 
        break; 
      } 
     } 
    } 
} 

EDIT#3

以下是我打算使用该解决方案:

我让库使用者的螺纹直接调用XXXAsync(除了SendAsync)操作。在大多数情况下,调用将是成功的(因为调用线程确实很少终止)。
如果操作失败,并且SocketError.OperationAborted,库仅使用异步回调中的当前线程再次调用该操作。这是一个ThreadPool线程,它有很好的成功机会(如果SocketError.OperationAborted的原因是由于其他错误导致的,则最多使用此解决方法一次会设置一个附加标志)。 这应该工作,因为套接字本身仍然可以,只是以前的操作失败。

对于SendAsync,此解决方法不起作用,因为它可能会弄乱消息的顺序。在这种情况下,我会将消息排列在FIFO列表中。我将使用ThreadPool将其出列并通过SendAsync发送。

回答

1

这里有几个红旗。如果你在维持线程活动时遇到困难,那么你有一个线程没有任何用处。在这种情况下,使用Async方法是没有意义的。如果速度是你唯一关心的,那么你不应该使用Async方法。它们要求Socket抓取一个tp线程来进行回调。这是小开销,但有如果一个普通的线程进行阻塞呼叫。

当您需要在处理多个同时连接时使您的应用程序很好地扩展时,您应该只考虑异步方法。这产生非常突发的CPU负载,非常适合TP线程。

+0

这并不是说我有任何无用的线程正在运行。相反,我不希望库强制其用户确保自己的线程保持活动状态,直到完成所有已启动的操作。我想,保证这一点的唯一可能性是通过使用ThreadPool从库控件**或**中的线程进行XXXAsync调用。我想知道,CLR是否会保证它不会结束任何具有突出IO操作的CLR ThreadPool线程, – thaller 2011-03-29 11:10:25

+0

如果库客户端不想坚持读取结果,是不是你想要的异常? – 2011-03-29 11:24:08

+0

仅仅因为库客户端让线程调用SendAsync终止,他仍然在意读取结果。他期望在另一个线程(CRL'ThreadPool')中得到结果,而不管他用于调用RecvAsync的线程发生了什么。此外,库从客户端获取SocketError,但套接字仍然正常,只是该特定操作失败(或者套接字关闭了吗?请检查...)。如果收到它容易,只需重复它。对于发送数据更为复杂,因为应用程序TCP协议可能不允许丢失消息。 – thaller 2011-03-29 11:57:18

0

它调用,直到操作完成”

我不相信这句话的veractiy这些方法必须存活线程。当你开始在网络操作这被传递到Windows IO系统当网络事件发生时(即收到数据并且你的应用程序被通知),这可以由另一个线程处理(即使启动线程早已死亡)。

你确定你不要放弃对重要事物的引用,并允许它是垃圾集?

如果您处于允许线程死亡的情况,那么我会怀疑您的代码没有准备好扩展到超快/恒星#的客户端。如果你真的想要扩展,你应该非常严格地控制线程的使用。让一个线程死亡的代价(我假设另一个代码需要旋转来代替它)并不是你应该允许发生太多的代价。线程池非常适合这里。

+0

嗨斯佩德,我添加了我的测试代码来重现问题。我认为可以这样看待问题。我同意你的第二段。一种选择是用编译时间标志来编译具有不安全行为的库以获得高性能(在分析存在实际瓶颈之后:-))。我只是想知道什么是最好的解决方案。 – thaller 2011-03-28 23:09:36

相关问题