在我的应用程序中,我必须侦听多个不同的队列并反序列化/调度在队列上接收到的传入消息。用任务替换无限线程循环(消息泵)
实际上,我在做什么来实现这一目标是每个QueueConnector对象在构造时创建一个新线程,该线程通过对queue.Receive()的阻塞调用执行一个无限循环,以接收队列中的下一条消息,下面的代码:
// Instantiate message pump thread
msmqPumpThread = new Thread(() => while (true)
{
// Blocking call (infinite timeout)
// Wait for a new message to come in queue and get it
var message = queue.Receive();
// Deserialize/Dispatch message
DeserializeAndDispatchMessage(message);
}).Start();
我想知道,如果这个“消息泵”,可以使用任务(一个或多个)所取代,而不是通过一个新的线程无限循环下去。
我已经为消息接收部分做了一个任务(见下面),但我真的不知道如何将它用于消息泵(我可以一遍又一遍记得一遍又一遍地完成同一个任务,在代码替换单独的线程无限循环如上?)
Task<Message> GetMessageFromQueueAsync()
{
var tcs = new TaskCompletionSource<Message>();
ReceiveCompletedEventHandler receiveCompletedHandler = null;
receiveCompletedHandler = (s, e) =>
{
queue.ReceiveCompleted -= receiveCompletedHandler;
tcs.SetResult(e.Message);
};
queue.BeginReceive();
return tcs.Task;
}
我将获得通过使用任务而不是无限循环的任何一个单独的线程(在此上下文中,阻塞调用=>阻塞线程)?如果是的话,如何正确地做到这一点?
请注意,此应用程序没有很多QueueConnector对象,并且不会有(可能有10个连接器MAX),这意味着通过第一个解决方案的最大线程数为10,因此内存占用量/性能启动线程不是问题在这里。我宁愿考虑调度性能/ CPU使用率。会有什么区别吗?
这是一个tipycal生产者 - 消费者场景,可以使用['BlockingCollection'](http://msdn.microsoft.com/en-us/library/dd267312.aspx)解决,特别是使用['BlockingCollection。GetConsumingEnumerable'](http://msdn.microsoft.com/en-us/library/dd287186.aspx) –
2012-07-05 18:26:49
感谢Paolo! 但据我所知,生产者/消费者队列更适合于计算绑定任务(执行密集计算),而TaskCompletionSource /异步函数更适合于I/O绑定任务(等待发生的事情)。 由于我的问题处理I/O绑定任务(等待消息进入队列),我认为TaskCompletionSource会更加合适。然而,我可能是错的。 – darkey 2012-07-05 21:29:56