2014-07-15 57 views

回答

0

您可以像使用常规队列一样使用来自DLX(实际上来自DLQ)的消息。你提出的建议(从DLQ获取消息并将它发布到队列中,它最初是从静止的队列中)可能会导致消息循环(通常它会)。

最佳实践方式是以某种单独的方式处理死锁消息,或者根本不要盲注它们。

+0

那里是没有错的信息,但我们在处理一个错误(如丢失连接等)它会方便重新发送消息并尝试重新处理。我同意,如果死信将所有事情自动回馈给大多数情况下会导致无限循环。 – jhilden

+0

使用消息确认可防止消费者意外失败时丢失消息。 – pinepain

+0

Zaq,我们克服了失败,使他们不会永远陷入队列中。我喜欢你的概念,我们只需要确保那些真正应该被忽略的东西,而且可以重新尝试的东西仍然存在。 – jhilden

0

我没有找到一个内置的方式来做到这一点,所以我创建了我自己的小解决方案。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Framing.v0_9_1; 

namespace RabbitMQReprocessDeadLetter 
{ 
    public class RabbitReprocessor 
    { 
     private readonly IModel _model; 
     private readonly string _deadLetterQueueName; 
     private const ushort FetchSize = 10; 
     private const string ConsumerName = "DeadLetterReprocessor"; 

     public RabbitReprocessor(IConnection rabbitConnection, string deadLetterQueueName) 
     { 
      _deadLetterQueueName = deadLetterQueueName; 
      _model = rabbitConnection.CreateModel(); 
     } 

     public void StartConsuming(CancellationTokenSource cancellationTokenSource = null) 
     { 
      // Configure the Quality of service for the model. Below is how what each setting means. 
      // BasicQos(0="Dont send me a new message untill I’ve finshed", _fetchSize = "Send me N messages at a time", false ="Apply to this Model only") 
      _model.BasicQos(0, FetchSize, false); 

      var queueingBasicConsumer = new QueueingBasicConsumer(_model); 
      _model.BasicConsume(_deadLetterQueueName, false, ConsumerName, queueingBasicConsumer); 

      while (true) 
      { 
       if (cancellationTokenSource != null && cancellationTokenSource.IsCancellationRequested) 
       { 
        return; 
       } 

       var e = queueingBasicConsumer.Queue.Dequeue(); // blocking call 
       var deathProperties = (List<object>) e.BasicProperties.Headers["x-death"]; 
       var prop = (Dictionary<string, object>)deathProperties.Single(); 
       var queueAsByteArray = (byte[])prop["queue"]; 
       var queueName = queueAsByteArray.ConvertToString(); 
       var data = e.Body; 
       try 
       { 
        Console.WriteLine("{0} => {1}", queueName, data.Deserialize<long>()); 
       } 
       // ReSharper disable once EmptyGeneralCatchClause 
       catch { } 
       SendMessageToQueue(queueName, data); 
       _model.BasicAck(e.DeliveryTag, false); 
      } 
     } 

     /// <summary> 
     /// delivery the message directly into the queue from which it came. 
     /// 
     /// You may want to put it back into an exchange instead of a queue. 
     /// </summary> 
     private void SendMessageToQueue(string queueName, byte[] messageBytes) 
     { 
      const string exchangeName = ""; 
      if (string.IsNullOrEmpty(queueName)) 
      { 
       throw new ArgumentNullException("queueName"); 
      } 
      if (messageBytes == null) 
      { 
       throw new ArgumentNullException("messageBytes"); 
      } 
      var basicProperties = new BasicProperties 
      { 
       DeliveryMode = 2//2 = durable 
      }; 
      _model.BasicPublish(exchangeName, queueName, basicProperties, messageBytes); 
     } 
    } 
} 

完整的解决方案可以在这里找到:

https://github.com/jayhilden/RabbitMQReprocessDeadLetter

的情况下