是否可以通过RabbitMQ发送消息并延迟一段时间? 例如,我想在30分钟后过期客户端会话,并发送30分钟后将处理的消息。RabbitMQ中的延迟消息
回答
这是目前不可能的。您必须将过期时间戳存储在数据库中或类似的东西上,然后有一个帮助程序读取这些时间戳并排队消息。
延迟消息是经常被要求的功能,因为它们在很多情况下都很有用。但是,如果您的需要是过期的客户端会话,我认为消息传递不是您的理想解决方案,而另一种方法可能会更好。
随着RabbitMQ的V2.8的发布,预定交付现已但是作为一个间接的功能:http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
我试过这种方法,但遇到了一些问题,建议任何一个? http://blog.james-carr.org/2012/03/30/rabbitmq-sending-a-message-to-be-consumed-later/#comment-502703 – 2013-01-15 13:46:16
我做了一个秒杀,打了几个showstoppers: 1.消息只有DLQ:en在Q的顶部(http://www.rabbitmq.com/ttl.html - 注意部分) 这意味着如果我第一次设置消息1将在4小时后过期并且msg2在1小时后过期msg2只会在msg1过期后过期。 2.消息的TTL由Rabbit保存,所以可以说你使用10秒的短暂超时。如果消费者在消息过期10秒后仍然无法使用消息(由于积压),消息将被丢弃并丢失 以上内容已通过兔子3.0.1验证 您是否看到任何解决方法? – 2013-01-16 14:51:42
由于我没有足够的声誉添加评论,发布新的答案。这只是在http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html
已经讨论过的内容的补充,除了在消息上设置ttl之外,您可以将其设置在队列级别。此外,您可以避免创建新的交换,只是为了将消息重定向到不同的队列。下面是示例Java代码:
监制:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class DelayedProducer {
private final static String QUEUE_NAME = "ParkingQueue";
private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
arguments.put("x-dead-letter-exchange", "");
arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for (int i=0; i<5; i++) {
String message = "This is a sample message " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("message "+i+" got published to the queue!");
Thread.sleep(3000);
}
channel.close();
connection.close();
}
}
消费者:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
非常感谢。我认为你在消费者队列中有一个小错误declare channel.queueDeclare(QUEUE_NAME,false,false,false,null);它应该有“DESTINATION_QUEUE_NAME”而不是“QUEUE_NAME”。非常非常感谢你 – 2016-09-19 21:45:42
它看起来像this blog post描述了使用死信交换和消息的TTL做类似的事情。
下面的代码使用CoffeeScript和Node.JS来访问Rabbit并实现类似的东西。
amqp = require 'amqp'
events = require 'events'
em = new events.EventEmitter()
conn = amqp.createConnection()
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
conn.queue key, {
arguments:{
"x-dead-letter-exchange":"immediate"
, "x-message-ttl": 5000
, "x-expires": 6000
}
}, ->
conn.publish key, {v:1}, {contentType:'application/json'}
conn.exchange 'immediate'
conn.queue 'right.now.queue', {
autoDelete: false
, durable: true
}, (q) ->
q.bind('immediate', 'right.now.queue')
q.subscribe (msg, headers, deliveryInfo) ->
console.log msg
console.log headers
假设你有控制权的消费者,可以实现对消费者这样??延迟:
如果我们相信,在队列中的第n个消息总是具有较小的延迟比第n + 1条消息多(这在许多用例中都可以):生产者在任务中发送timeInformation来传达需要执行该任务的时间(currentTime + delay)。消费者:
1)读取任务
2)scheduledTime如果currentTime的> scheduledTime继续。
否则延迟= scheduledTime - currentTime的
睡眠时间由延迟
消费者指示总是与并发参数配置。所以,其他消息将在队列中等待,直到消费者完成工作。所以,这个解决方案可以工作得很好,虽然看起来很尴尬,特别是对于大的时间延迟。
感谢诺曼的回答,我可以在NodeJS中实现它。
代码中的一切都很清楚。 希望它能节省一些人的时间。
var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});
// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
deadLetterExchange: "my_final_delayed_exchange",
messageTtl: 5000, // 5sec
}, function (err, q) {
ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});
ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
ch.bindQueue(q.queue, "my_final_delayed_exchange", '');
ch.consume(q.queue, function (msg) {
console.log("delayed - [x] %s", msg.content.toString());
}, {noAck: true});
});
有两种方法可以尝试:
老方法:设置TTL(生存时间)报头中的每个消息/队列(策略),然后引入DLQ处理它。一旦ttl过期,您的消息将从DLQ移动到主队列,以便您的听众可以处理它。
最新方法:最近的RabbitMQ想出了RabbitMQ的延迟邮件插件,利用它可以实现相同的,因为RabbitMQ的-3.5.8可用此插件的支持。
您可以使用类型x-delayed-message声明交换,然后使用自定义标题x-delay以毫秒为单位发布消息,以表示消息的延迟时间。该消息将在X-delay毫秒
这里更多地传递到相应的队列:git
- 1. 延迟的消息循环与RabbitMQ的
- 2. 我怎么会使用RabbitMQ的延迟消息交换插件在RabbitMQ的发送延迟的消息?
- 3. 消息延迟
- 4. 生产者和消费者之间的RabbitMQ消息延迟
- 5. Spring AMQP中的计划/延迟消息传递RabbitMq
- 6. ToolStripStatusLabel延迟消息
- 7. MDB消息消费延迟
- 8. 春AMQP的消息延迟
- 9. 给每个消息一个自定义延迟(rabbitmq)?
- 10. NServiceBus延迟消息处理
- 11. ShowBalloonTip():消息出现延迟
- 12. 两个jQuery延迟消息?
- 13. NServicebus延迟消息传奇?
- 14. Websocket延迟发送消息
- 15. JMS队列中的延迟消息
- 16. 卡夫卡延迟消息消耗
- 17. 如何测试Flex消息的延迟
- 18. JUnit的延迟断言消息创建
- 19. 延迟NUnit的断言消息评估
- 20. 带消息延迟的批量重启
- 21. RabbitMQ/AMQP中的消息组
- 22. RabbitMQ中的消息丢失
- 23. MQ消息被延迟几天
- 24. NService总线消息延迟问题
- 25. 延迟消息队列最佳实践
- 26. vert.x sockJS实现延迟发送消息
- 27. 延迟或短暂暂停Windows消息
- 28. Google Cloud Messaging(GCM)下行消息延迟
- 29. 延迟调用消息队列?
- 30. C#TCP第一个消息延迟
你需要使用的RabbitMQ? – 2013-02-13 21:49:29
是的,自RabbitMQ-3.5.8开始提供此功能。 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ – lambodar 2017-06-03 02:11:43
如果你使用Spring AMQP,那么[支持插件](https://docs.spring.io/spring-amqp/reference/htmlsingle /#延迟消息交换)。 – Gruber 2017-09-07 13:19:30