2011-10-08 72 views
2

我有这个问题,其中一个系统包含推动要处理的消息的节点(windows服务),以及其他推送消息并处理它们的节点。Windows服务通过MSMQ进行通信 - 我需要服务总线吗?

设计时,推送节点通过在每次发送后维护一个循环队列列表和循环队列来平衡队列之间的负载。因此,消息1将进入队列1,消息2到队列2等。到目前为止,该部分工作得很好。

在消息的拉尾处,我们设计了这样一个消息,即以类似的方式检索消息 - 首先从队列1,然后从队列2等。理论上,每个拉节点位于不同的机器上,并且在实践中很远,它只在一个队列中进行监听。但最近的一项要求让我们在一台侦听多个队列的计算机上有一个拉节点:一个通常非常繁忙,充满了数百万条消息,而另一条通常只包含少量消息。

我们面临的问题是,我们最初构建的拉节点的方式是从队列到队列,直到找到消息。如果它超时(例如在一秒之后),则它移动到下一个队列。

这不再工作原因Q1(充满了数百万条消息)每条消息将被延迟大约一秒,因为从每次从Q1拉出后,我们会向Q2询问一条消息(如果它不包含任何消息,我们将等待第二)。

因此,它是这样的:

Q1包含10个消息和Q2不含有

  • 拉节点请求从Q1
  • Q1一个消息返回消息立即
  • 拉节点请求来自Q1的消息
  • ------------等待一秒-------------(Q2为空且请求超时)
  • 拉节点请求消息从Q1
  • Q1返回消息立即
  • 拉节点从Q1
  • 询问消息------------等待一个第二---- ---------(Q2是空的,请求超时)

因此,这显然是错误的。

我想我在这里寻找最好的建筑解决方案。消息处理不需要尽可能实时,但需要健壮,并且不应丢失任何消息!

我想听听您对这个问题的看法。

预先感谢 雅尼斯

+0

你可以避免所有的麻烦,每个队列使用一个处理线程,从而使每个队列处理独立于其他... – Yahia

+0

Yahia我已经结束了做你喜欢你的建议。如果有人需要,我会发布代码以备将来参考 – Yannis

回答

1

我最终创建了一组线程 - 每个msmq需要处理一个线程。在构造我初始化那些线程:

Storages.ForEach(queue => 
     { 
      Task task = Task.Factory.StartNew(() => 
      { 
       LoggingManager.LogInfo("Starting a local thread to read in mime messages from queue " + queue.Name, this.GetType()); 
       while (true) 
       { 
        WorkItem mime = queue.WaitAndRetrieve(); 
        if (mime != null) 
        { 
         _Semaphore.WaitOne(); 
         _LocalStorage.Enqueue(mime); 

         lock (_locker) Monitor.Pulse(_locker); 

         LoggingManager.LogDebug("Adding no. " + _LocalStorage.Count + " item in queue", this.GetType()); 
        } 
       } 
      }); 
     }); 
  • 的_LocalStorage是线程安全Queue实现(ConcurrentQueue引入在.NET 4.0)

  • 信号量是一个计数信号以控制插入在_LocalStorage。 _LocalStorage基本上是收到消息的缓冲区,但我们不希望它在处理节点忙于工作时变得太大。其效果可能是我们检索到_LocalStorage中的所有msmq消息,但正忙于处理其中的5个左右。这对于弹性(如果程序意外终止,我们会丢失所有这些消息)以及在性能方面都很差,因为将所有这些项目保存在内存中的内存消耗将非常巨大。所以我们需要控制我们在_LocalStorage缓冲区队列中保存的项目数量。

  • 我们脉冲线程等待工作(见下文),一个新的项目加入到队列中做一个简单的Monitor.Pulse

,从队列中出队的工作项目的代码如下:

lock (_locker) 
      if (_LocalStorage.Count == 0) 
       Monitor.Wait(_locker); 

     WorkItem result; 
     if (_LocalStorage.TryDequeue(out result)) 
     { 
      _Semaphore.Release(); 
      return result; 
     } 

     return null; 

我希望这可以帮助某人解决类似的问题。

1

也许你可以使用MessageQueue类的ReceiveCompleted事件?不需要再轮询。