2013-04-30 70 views
1

我已经做了一个Windows服务,从MSMQueue读取消息,我需要并行执行(两个线程应该同时读取消息)。我怎样才能做到这一点? 这里是我的代码(漂亮的书多):多线程MSMQ读取

public partial class MyNewService : ServiceBase 
    { 
     System.Messaging.MessageQueue mq; 
     System.Messaging.Message mes; 

     public MyNewService() 
     { 
      InitializeComponent(); 

      if (MessageQueue.Exists("MyServer\\MyQueue")) 
       mq = new System.Messaging.MessageQueue("MyServer\\MyQueue"); 

      mq.ReceiveCompleted += new ReceiveCompletedEventHandler(MyReceiveCompleted); 
      mq.BeginReceive(); 

     } 

     private static void MyReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult) 
     { 
      try 
      { 
       MessageQueue mq = (MessageQueue)source; 
       Message m = mq.EndReceive(asyncResult.AsyncResult); 

       // TODO: Process the m message here 

       // Restart the asynchronous receive operation. 
       mq.BeginReceive(); 
      } 
      catch(MessageQueueException) 
      { 
      // Handle sources of MessageQueueException. 
      } 

      return; 
     } 

} 

这是我的主要功能:

static class Program 
    { 
    static void Main() 
    { 
     ServiceBase[] ServicesToRun; 
     ServicesToRun = new ServiceBase[] 
      { 
       new MyNewService() 
      }; 
     ServiceBase.Run(ServicesToRun); 
    } 
    } 
+2

什么问题或问题? – 2013-04-30 20:05:22

+2

在我看来,像从队列中选择消息这样的东西不会从并行化中受益。你确定这不是真正的处理你想要并行化的消息所产生的工作量吗? – spender 2013-04-30 20:08:55

+0

这是空洞的故事:我已经将MSMQ阅读器作为控制台应用程序。当我在Queue中发送1000条消息时,该读者的一个实例在大约15分钟内处理它们。当我启动该控制台应用程序的两个实例时,它们在8分钟内完成。现在,我有相同的阅读器(需要15分钟处理)Windows服务(代码上面)。如何在代码中创建该阅读器的两个实例,以便他们更快地阅读消息(必须在代码中)? – 2013-05-01 12:40:27

回答

1

我将举办队列读者的单线程例如在多个窗口服务。

通过这种方式,您可以通过增加额外的服务来提高吞吐量,或者通过降低回退来节制。这比试图在代码中完成这一切要简单得多。

3

是否有一个原因,你不能只在多个线程上进行处理而不是在多个线程上出队?

这里是一个非常基本实现 - 它使用ThreadPool排队的项目,但你所依赖的ThreadPool的队列处理线程的数量和工作项目的数量。这可能不适合您的情况,这取决于许多其他因素。

此外,请注意关于SetMaxThreadshere的备注部分。

private static void MyReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult) 
{ 
    try 
    { 
     MessageQueue mq = (MessageQueue)source; 
     Message m = mq.EndReceive(asyncResult.AsyncResult); 

     // TODO: Process each message on a separate thread 
     // This will immediately queue all items on the threadpool, 
     // so there may be more threads spawned than you really want 
     // Change how many items are allowed to process concurrently using ThreadPool.SetMaxThreads() 
     System.Threading.ThreadPool.QueueUserWorkItem(new WaitCallback(doWork), m); 


     // Restart the asynchronous receive operation. 
     mq.BeginReceive(); 
    } 
    catch(MessageQueueException) 
    { 
     // Handle sources of MessageQueueException. 
    } 

    return; 
} 

private static void doWork(object message) 
{ 
    // TODO: Actual implementation here. 
}