我正在寻找一种方法从兔死亡队列队列中取出物品并将其重新处理回队列中。有没有内置的方法来做到这一点?RabbitMQ重新处理死亡队列队列
0
A
回答
0
您可以像使用常规队列一样使用来自DLX(实际上来自DLQ)的消息。你提出的建议(从DLQ获取消息并将它发布到队列中,它最初是从静止的队列中)可能会导致消息循环(通常它会)。
最佳实践方式是以某种单独的方式处理死锁消息,或者根本不要盲注它们。
0
我没有找到一个内置的方式来做到这一点,所以我创建了我自己的小解决方案。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing.v0_9_1;
namespace RabbitMQReprocessDeadLetter
{
public class RabbitReprocessor
{
private readonly IModel _model;
private readonly string _deadLetterQueueName;
private const ushort FetchSize = 10;
private const string ConsumerName = "DeadLetterReprocessor";
public RabbitReprocessor(IConnection rabbitConnection, string deadLetterQueueName)
{
_deadLetterQueueName = deadLetterQueueName;
_model = rabbitConnection.CreateModel();
}
public void StartConsuming(CancellationTokenSource cancellationTokenSource = null)
{
// Configure the Quality of service for the model. Below is how what each setting means.
// BasicQos(0="Dont send me a new message untill I’ve finshed", _fetchSize = "Send me N messages at a time", false ="Apply to this Model only")
_model.BasicQos(0, FetchSize, false);
var queueingBasicConsumer = new QueueingBasicConsumer(_model);
_model.BasicConsume(_deadLetterQueueName, false, ConsumerName, queueingBasicConsumer);
while (true)
{
if (cancellationTokenSource != null && cancellationTokenSource.IsCancellationRequested)
{
return;
}
var e = queueingBasicConsumer.Queue.Dequeue(); // blocking call
var deathProperties = (List<object>) e.BasicProperties.Headers["x-death"];
var prop = (Dictionary<string, object>)deathProperties.Single();
var queueAsByteArray = (byte[])prop["queue"];
var queueName = queueAsByteArray.ConvertToString();
var data = e.Body;
try
{
Console.WriteLine("{0} => {1}", queueName, data.Deserialize<long>());
}
// ReSharper disable once EmptyGeneralCatchClause
catch { }
SendMessageToQueue(queueName, data);
_model.BasicAck(e.DeliveryTag, false);
}
}
/// <summary>
/// delivery the message directly into the queue from which it came.
///
/// You may want to put it back into an exchange instead of a queue.
/// </summary>
private void SendMessageToQueue(string queueName, byte[] messageBytes)
{
const string exchangeName = "";
if (string.IsNullOrEmpty(queueName))
{
throw new ArgumentNullException("queueName");
}
if (messageBytes == null)
{
throw new ArgumentNullException("messageBytes");
}
var basicProperties = new BasicProperties
{
DeliveryMode = 2//2 = durable
};
_model.BasicPublish(exchangeName, queueName, basicProperties, messageBytes);
}
}
}
完整的解决方案可以在这里找到:
的情况下相关问题
- 1. RabbitMQ死信交换/队列
- 2. 使用RabbitMQ逐一处理队列
- 3. Python队列似乎正在死亡
- 4. RabbitMQ队列订单管理
- 5. RabbitMQ - parellel队列
- 6. RabbitMQ队列窥探
- 7. Rabbitmq队列分片
- 8. RabbitMQ队列组织
- 9. 使用MSMQ死信队列或管理队列处理未发送的消息?
- 10. 死信队列和毒物队列
- 11. 并行处理多个rabbitmq队列的Spark Streaming处理
- 12. RabbitMQ/AMQP HA App:消费者死亡/退出后的持续队列?
- 13. 死信队列和退队队列有什么区别?
- 14. 去队列处理失败后重试
- 15. 定义队列中的RabbitMQ
- 16. 芹菜+ RabbitMQ的空队列
- 17. RabbitMQ:检查队列存在
- 18. Rabbitmq镜像队列性能
- 19. RabbitMQ挂起队列消耗
- 20. 的RabbitMQ AMQP队列设计
- 21. RabbitMQ,EasyNetQ队列名称
- 22. NServiceBus不创建RabbitMQ队列
- 23. RabbitMQ - 非持久队列
- 24. RabbitMQ - 消耗多个队列
- 25. RabbitMQ和队列数据
- 26. 多处理和队列
- 27. 如何处理队列?
- 28. Nodejs sqs队列处理器
- 29. 多处理队列已满
- 30. 线程/多处理/队列?
那里是没有错的信息,但我们在处理一个错误(如丢失连接等)它会方便重新发送消息并尝试重新处理。我同意,如果死信将所有事情自动回馈给大多数情况下会导致无限循环。 – jhilden
使用消息确认可防止消费者意外失败时丢失消息。 – pinepain
Zaq,我们克服了失败,使他们不会永远陷入队列中。我喜欢你的概念,我们只需要确保那些真正应该被忽略的东西,而且可以重新尝试的东西仍然存在。 – jhilden