我正在实现一个小型库,以便使用System.Net.Sockets.Socket
更容易。它应该处理任意数量的监听和接收TCP套接字,并且实现速度很快很重要。确保线程调用Socket.XXXAsync保持活动状态以完成IO请求(IO完成端口,C#)
我正在使用XXXAsync
方法和CLR ThreadPool
回调库用户的委托(例如,每当成功发送消息或收到某些数据时)。最好的库本身不会启动任何线程。
库的用户可以访问的接口到我Sockets
包装要发送消息或开始接收(许多其它方法和过载中)消息:
public interface IClientSocket {
// will eventually call Socket.SendAsync
IMessageHandle SendAsync(byte[] buffer, int offset, int length);
// will eventually call Socket.RecieveAsync
void StartReceiveAsync();
}
XXXAsync
该方法使用IO完成端口。因此,调用这些方法的线程必须保持活动状态,直到操作完成,否则操作失败,并且SocketError.OperationAborted
(我认为是这种情况,或者不是?)。 对库的用户施加这样的限制是丑陋且容易出错的。
这里最好的选择是什么?
调用
ThreadPool.QueueUserWorkItem
与委托调用XXXAsync
方法?那安全吗?我在某处读过,ThreadPool不会停止具有任何IOCP的空闲线程。这将是很好的,因为它解决了上述问题。
但是,对于很多TCP连接也可能不好。 在这种情况下,每个ThreadPool线程可能会调用其中一个挂起的ReceiveAsync
调用。因此,即使当前工作负载很低并且许多线程空闲(并浪费了内存),ThreadPool也不会收缩。启动一个始终活动的专用线程并调用
XXXAsync
方法。例如,当库用户想要发送数据时,它将一个委托放入一个同步队列中,该线程将其弹出并调用SendAsync
方法。
我不太喜欢这个解决方案,因为它浪费了一个线程,并且在多核心机器上,发送只能由一个线程执行。
此外,两种解决方案都没有缝合最好,因为他们通过作业调用异步方法到另一个线程。这可以避免吗?
您认为如何? (谢谢!!)
托马斯
编辑1:
我能重现SocketError.OperationAborted
问题,下面的测试程序(我认为这是正确的)。
编译,启动并telnet到端口127.0.0.1:10000。发送“t”或“T”并等待> 3秒钟。发送“T”时,ReceiveAsync
调用在ThreadPool(工作)中完成,“t”开始一个新的线程,在3秒钟后终止(失败)。
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Collections.Generic;
namespace Test {
class Program {
static List<Socket> _referenceToSockets = new List<Socket>();
static void Main(string[] args) {
Thread.CurrentThread.Name = "Main Thread";
// Create a listening socket on Port 10000.
Socket ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_referenceToSockets.Add(ServerSocket);
var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 10000);
ServerSocket.Bind(endPoint);
ServerSocket.Listen(50);
// Start listening.
var saeaAccept = new SocketAsyncEventArgs();
saeaAccept.Completed += OnCompleted;
ServerSocket.AcceptAsync(saeaAccept);
Console.WriteLine(String.Format("Listening on {0}.", endPoint));
Console.ReadLine();
}
private static void OnCompleted(object obj, SocketAsyncEventArgs evt) {
var socket = (Socket)obj;
Console.WriteLine(String.Format("Async operation completed: {0}; Error: {1}; Callback-Thread: \"{2}\" ({3} threadpool)", evt.LastOperation, evt.SocketError, Thread.CurrentThread.Name, Thread.CurrentThread.IsThreadPoolThread?"is":"no"));
switch (evt.LastOperation) {
case SocketAsyncOperation.Accept:
// Client connected. Listen for more.
Socket clientSocket = evt.AcceptSocket;
_referenceToSockets.Add(clientSocket);
evt.AcceptSocket = null;
socket.AcceptAsync(evt);
// Start receiving data.
var saeaReceive = new SocketAsyncEventArgs();
saeaReceive.Completed += OnCompleted;
saeaReceive.SetBuffer(new byte[1024], 0, 1024);
clientSocket.ReceiveAsync(saeaReceive);
break;
case SocketAsyncOperation.Disconnect:
socket.Close();
evt.Dispose();
break;
case SocketAsyncOperation.Receive:
if (evt.SocketError != SocketError.Success) {
socket.DisconnectAsync(evt);
return;
}
var asText = Encoding.ASCII.GetString(evt.Buffer, evt.Offset, evt.BytesTransferred);
Console.WriteLine(String.Format("Received: {0} bytes: \"{1}\"", evt.BytesTransferred, asText));
if (evt.BytesTransferred == 0) {
socket.Close();
evt.Dispose();
}
if (asText.ToUpper().StartsWith("T")) {
Action<object> action = (object o) => {
socket.ReceiveAsync(evt);
Console.WriteLine(String.Format("Called ReceiveAsync {0}...", o));
Thread.Sleep(3000);
Console.WriteLine("End of Action...");
};
if (asText.StartsWith("T")) {
ThreadPool.QueueUserWorkItem(o=>action(o), "in ThreadPool");
} else {
new Thread(o=>action(o)).Start("in a new Thread");
}
} else {
socket.ReceiveAsync(evt);
}
break;
}
}
}
}
EDIT#3
以下是我打算使用该解决方案:
我让库使用者的螺纹直接调用XXXAsync
(除了SendAsync
)操作。在大多数情况下,调用将是成功的(因为调用线程确实很少终止)。
如果操作失败,并且SocketError.OperationAborted
,库仅使用异步回调中的当前线程再次调用该操作。这是一个ThreadPool
线程,它有很好的成功机会(如果SocketError.OperationAborted
的原因是由于其他错误导致的,则最多使用此解决方法一次会设置一个附加标志)。 这应该工作,因为套接字本身仍然可以,只是以前的操作失败。
对于SendAsync
,此解决方法不起作用,因为它可能会弄乱消息的顺序。在这种情况下,我会将消息排列在FIFO列表中。我将使用ThreadPool
将其出列并通过SendAsync
发送。
这并不是说我有任何无用的线程正在运行。相反,我不希望库强制其用户确保自己的线程保持活动状态,直到完成所有已启动的操作。我想,保证这一点的唯一可能性是通过使用ThreadPool从库控件**或**中的线程进行XXXAsync调用。我想知道,CLR是否会保证它不会结束任何具有突出IO操作的CLR ThreadPool线程, – thaller 2011-03-29 11:10:25
如果库客户端不想坚持读取结果,是不是你想要的异常? – 2011-03-29 11:24:08
仅仅因为库客户端让线程调用SendAsync终止,他仍然在意读取结果。他期望在另一个线程(CRL'ThreadPool')中得到结果,而不管他用于调用RecvAsync的线程发生了什么。此外,库从客户端获取SocketError,但套接字仍然正常,只是该特定操作失败(或者套接字关闭了吗?请检查...)。如果收到它容易,只需重复它。对于发送数据更为复杂,因为应用程序TCP协议可能不允许丢失消息。 – thaller 2011-03-29 11:57:18