2010-09-21 222 views
8

我想发送消息给RabbitMQ服务器,然后等待回复消息(在“回复”队列中)。当然,我不想永远等待处理这些消息的应用程序停机 - 这需要超时。这听起来像是一项非常基本的任务,但我找不到办法做到这一点。我现在用Java API遇到了这个问题。RabbitMQ等待超时消息

回答

0

com.rabbitmq.client.QueueingConsumernextDelivery(long timeout)方法,它会做你想要什么。但是,这已被弃用。 编写你自己的超时并不难,尽管有一个正在进行的线程和一个实时标识符列表可能会更好,而不是始终添加和删除使用者和关联的超时线程。

修改即可添加:回复后注意日期!

0

我使用C#通过创建一个对象跟踪对特定消息的响应来处理此问题。它为消息设置一个唯一的回复队列,并订阅它。如果在指定的时间内未收到响应,则倒数计时器会取消订阅,这会删除队列。另外,我拥有可以从我的主线程(使用信号量)或异步(使用回调)同步的方法来利用此功能。

基本上,实施看起来是这样的:

//Synchronous case: 
//Throws TimeoutException if timeout happens 
var msg = messageClient.SendAndWait(theMessage); 

//Asynchronous case 
//myCallback receives an exception message if there is a timeout 
messageClient.SendAndCallback(theMessage, myCallback); 
2

的RabbitMQ的Java客户端库现在supports a timeout argument to its QueueConsumer.nextDelivery() method

例如,RPC教程使用下面的代码:

channel.basicPublish("", requestQueueName, props, message.getBytes()); 

while (true) { 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    if (delivery.getProperties().getCorrelationId().equals(corrId)) { 
     response = new String(delivery.getBody()); 
     break; 
    } 
} 

现在,你可以使用consumer.nextDelivery(1000)等待最大一秒钟。如果达到超时,则该方法返回null

channel.basicPublish("", requestQueueName, props, message.getBytes()); 

while (true) { 
    // Use a timeout of 1000 milliseconds 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000); 

    // Test if delivery is null, meaning the timeout was reached. 
    if (delivery != null && 
     delivery.getProperties().getCorrelationId().equals(corrId)) { 
     response = new String(delivery.getBody()); 
     break; 
    } 
}