2010-12-14 177 views
23

是否可以通过RabbitMQ发送消息并延迟一段时间? 例如,我想在30分钟后过期客户端会话,并发送30分钟后将处理的消息。RabbitMQ中的延迟消息

+0

你需要使用的RabbitMQ? – 2013-02-13 21:49:29

+1

是的,自RabbitMQ-3.5.8开始提供此功能。 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ – lambodar 2017-06-03 02:11:43

+0

如果你使用Spring AMQP,那么[支持插件](https://docs.spring.io/spring-amqp/reference/htmlsingle /#延迟消息交换)。 – Gruber 2017-09-07 13:19:30

回答

5

这是目前不可能的。您必须将过期时间戳存储在数据库中或类似的东西上,然后有一个帮助程序读取这些时间戳并排队消息。

延迟消息是经常被要求的功能,因为它们在很多情况下都很有用。但是,如果您的需要是过期的客户端会话,我认为消息传递不是您的理想解决方案,而另一种方法可能会更好。

9

随着RabbitMQ的V2.8的发布,预定交付现已但是作为一个间接的功能:http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

+0

我试过这种方法,但遇到了一些问题,建议任何一个? http://blog.james-carr.org/2012/03/30/rabbitmq-sending-a-message-to-be-consumed-later/#comment-502703 – 2013-01-15 13:46:16

+5

我做了一个秒杀,打了几个showstoppers: 1.消息只有DLQ:en在Q的顶部(http://www.rabbitmq.com/ttl.html - 注意部分) 这意味着如果我第一次设置消息1将在4小时后过期并且msg2在1小时后过期msg2只会在msg1过期后过期。 2.消息的TTL由Rabbit保存,所以可以说你使用10秒的短暂超时。如果消费者在消息过期10秒后仍然无法使用消息(由于积压),消息将被丢弃并丢失 以上内容已通过兔子3.0.1验证 您是否看到任何解决方法? – 2013-01-16 14:51:42

6

由于我没有足够的声誉添加评论,发布新的答案。这只是在http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

已经讨论过的内容的补充,除了在消息上设置ttl之外,您可以将其设置在队列级别。此外,您可以避免创建新的交换,只是为了将消息重定向到不同的队列。下面是示例Java代码:

监制:

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import java.util.HashMap; 
import java.util.Map; 

public class DelayedProducer { 
    private final static String QUEUE_NAME = "ParkingQueue"; 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory connectionFactory = new ConnectionFactory(); 
     connectionFactory.setHost("localhost"); 
     Connection connection = connectionFactory.newConnection(); 
     Channel channel = connection.createChannel(); 

     Map<String, Object> arguments = new HashMap<String, Object>(); 
     arguments.put("x-message-ttl", 10000); 
     arguments.put("x-dead-letter-exchange", ""); 
     arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME); 
     channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 

     for (int i=0; i<5; i++) { 
      String message = "This is a sample message " + i; 
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
      System.out.println("message "+i+" got published to the queue!"); 
      Thread.sleep(3000); 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

消费者:

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 

public class Consumer { 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     boolean autoAck = false; 
     channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer); 

     while (true) { 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      System.out.println(" [x] Received '" + message + "'"); 
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
     } 

    } 
} 
+0

非常感谢。我认为你在消费者队列中有一个小错误declare channel.queueDeclare(QUEUE_NAME,false,false,false,null);它应该有“DESTINATION_QUEUE_NAME”而不是“QUEUE_NAME”。非常非常感谢你 – 2016-09-19 21:45:42

5

它看起来像this blog post描述了使用死信交换和消息的TTL做类似的事情。

下面的代码使用CoffeeScript和Node.JS来访问Rabbit并实现类似的东西。

amqp = require 'amqp' 
events = require 'events' 
em  = new events.EventEmitter() 
conn = amqp.createConnection() 

key = "send.later.#{new Date().getTime()}" 
conn.on 'ready', -> 
    conn.queue key, { 
    arguments:{ 
     "x-dead-letter-exchange":"immediate" 
    , "x-message-ttl": 5000 
    , "x-expires": 6000 
    } 
    }, -> 
    conn.publish key, {v:1}, {contentType:'application/json'} 

    conn.exchange 'immediate' 

    conn.queue 'right.now.queue', { 
     autoDelete: false 
    , durable: true 
    }, (q) -> 
    q.bind('immediate', 'right.now.queue') 
    q.subscribe (msg, headers, deliveryInfo) -> 
     console.log msg 
     console.log headers 
0

假设你有控制权的消费者,可以实现对消费者这样??延迟:

如果我们相信,在队列中的第n个消息总是具有较小的延迟比第n + 1条消息多(这在许多用例中都可以):生产者在任务中发送timeInformation来传达需要执行该任务的时间(currentTime + delay)。消费者:

1)读取任务

2)scheduledTime如果currentTime的> scheduledTime继续。

否则延迟= scheduledTime - currentTime的

睡眠时间由延迟

消费者指示总是与并发参数配置。所以,其他消息将在队列中等待,直到消费者完成工作。所以,这个解决方案可以工作得很好,虽然看起来很尴尬,特别是对于大的时间延迟。

5

感谢诺曼的回答,我可以在NodeJS中实现它。

代码中的一切都很清楚。 希望它能节省一些人的时间。

var ch = channel; 
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false}); 
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false}); 

// setup intermediate queue which will never be listened. 
// all messages are TTLed so when they are "dead", they come to another exchange 
ch.assertQueue("my_intermediate_queue", { 
     deadLetterExchange: "my_final_delayed_exchange", 
     messageTtl: 5000, // 5sec 
}, function (err, q) { 
     ch.bindQueue(q.queue, "my_intermediate_exchange", ''); 
}); 

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) { 
     ch.bindQueue(q.queue, "my_final_delayed_exchange", ''); 

     ch.consume(q.queue, function (msg) { 
      console.log("delayed - [x] %s", msg.content.toString()); 
     }, {noAck: true}); 
}); 
4

有两种方法可以尝试:

老方法:设置TTL(生存时间)报头中的每个消息/队列(策略),然后引入DLQ处理它。一旦ttl过期,您的消息将从DLQ移动到主队列,以便您的听众可以处理它。

最新方法:最近的RabbitMQ想出了RabbitMQ的延迟邮件插件,利用它可以实现相同的,因为RabbitMQ的-3.5.8可用此插件的支持。

您可以使用类型x-delayed-message声明交换,然后使用自定义标题x-delay以毫秒为单位发布消息,以表示消息的延迟时间。该消息将在X-delay毫秒

这里更多地传递到相应的队列:git