2015-08-18 84 views
4

我正在寻找一种可靠的方式来重新排队无法正确处理的消息 - 此时。重新排队异常的消息

我一直在寻找http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/,它似乎支持在RabbitMQ API中重新发送消息。

else //reject the message but push back to queue for later re-try 
{ 
    Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message); 
    model.BasicReject(deliveryArguments.DeliveryTag, true); 
} 

但是我使用EasyNetQ。 所以想知道我会在这里做类似的事情。

bus.Subscribe<MyMessage>("my_subscription_id", msg => { 
    try 
    { 
     // do work... could be long running 
    } 
    catch() 
    { 
     // something went wrong - requeue message 
    } 
}); 

这是一个很好的方法吗?不是ACK如果do work超过RabbitMQ服务器等待ACK超时的时间,则该消息可能会导致问题。

回答

7

所以我想出了这个解决方案。它取代EasyNetQ的默认错误策略。

public class DeadLetterStrategy : DefaultConsumerErrorStrategy 
{ 
    public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer) 
    : base(connectionFactory, serializer, logger, conventions, typeNameSerializer) 
    { 
    } 

    public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception) 
    { 
     object deathHeaderObject; 
     if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject)) 
      return AckStrategies.NackWithoutRequeue; 

     var deathHeaders = deathHeaderObject as IList; 

     if (deathHeaders == null) 
      return AckStrategies.NackWithoutRequeue; 

     var retries = 0; 
     foreach (IDictionary header in deathHeaders) 
     { 
      var count = int.Parse(header["count"].ToString()); 
      retries += count; 
     } 

     if (retries < 3) 
      return AckStrategies.NackWithoutRequeue; 
     return base.HandleConsumerError(context, exception); 
    } 
} 

你喜欢这个替换它:

RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()) 

您必须使用AdvancedBus所以你必须设置了一切手动。

using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())) 
{ 
    var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct); 
    var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct); 
    var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name); 
    bus.Advanced.Bind(deadExchange, queue, ""); 
    bus.Advanced.Bind(textExchange, queue, ""); 

    bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info)); 
} 

这将死信失败的消息3次。之后,它将进入EasyNetQ提供的默认错误队列进行错误处理。 您可以订阅该队列。

当一个异常传播出您的使用者方法时,一条消息就会死信。所以这会触发死信。

static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info) 
{ 
    throw new Exception("This is a test!"); 
} 
+0

寻找你的DeadLetterStrategy有重新提交消息队列之间没有延迟。我希望在下次重新提交之间有一些运行延迟,比如2 * t秒的延迟(通过乘以诸如2的因子来延迟时间的常见策略)。这可以在没有Thread.Sleep的情况下完成吗? – Macko

3

给我所知,没有办法手动acknackreject与EasyNetQ的消息。

我看你有opened an issue ticket with the EasyNetQ team,关于这个......但是没有答案。

FWIW,这是一个非常合适的事情。我使用的所有库都支持此功能集(在NodeJS中),这很常见。我很惊讶EasyNetQ不支持这一点。

+0

看来我要回退到上RabbitMQ.Client如果我想要实现这个功能 –