2017-05-22 45 views
9

RabbitMQ Wait for a message with a timeoutWait for a single RabbitMQ message with a timeout中的解决方案似乎不起作用,因为官方C#库中没有下一个传递方法,并且QueueingBasicConsumer被删除,所以它只是在任何地方抛出NotSupportedException异常。C#RabbitMQ为指定的超时等待一条消息?

我如何等待队列中的单个消息指定超时?

PS

它可以通过Basic.Get(),是的,但很好,这是不好解决拉specififed间隔消息(超额流量,多余的CPU)来完成。

更新

EventingBasicConsumer通过implmenetation不支持立即取消。即使您在某个时候调用BasicCancel,即使您指定通过BasicQos进行预取,它仍将在中取回,并且这些帧可以包含多个消息。所以,单个任务执行不好。不要打扰 - 它不适用于单条消息。

回答

5

有很多方法可以做到这一点。例如,您可以用ManualResetEvent一起使用EventingBasicConsumer,像这样的(这只是用于演示目的 - 更好地利用以下方法之一):

var factory = new ConnectionFactory(); 
using (var connection = factory.CreateConnection()) { 
    using (var channel = connection.CreateModel()) { 
     // setup signal 
     using (var signal = new ManualResetEvent(false)) { 
      var consumer = new EventingBasicConsumer(channel); 
      byte[] messageBody = null;       
      consumer.Received += (sender, args) => { 
       messageBody = args.Body; 
       // process your message or store for later 
       // set signal 
       signal.Set(); 
      };    
      // start consuming 
      channel.BasicConsume("your.queue", false, consumer); 
      // wait until message is received or timeout reached 
      bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10)); 
      // cancel subscription 
      channel.BasicCancel(consumer.ConsumerTag); 
      if (timeout) { 
       // timeout reached - do what you need in this case 
       throw new Exception("timeout"); 
      } 

      // at this point messageBody is received 
     } 
    } 
} 

正如你在注释中规定 - 如果你希望在同一个队列多个消息,这不是最好的方法。那么这不是最好的方法,我只是为了演示ManualResetEvent在案例库本身不提供超时支持的情况下使用它。

如果您正在进行RPC(远程过程调用,请求 - 答复) - 您可以在服务器端使用SimpleRpcClientSimpleRpcServer。客户端看起来就像这样:

var client = new SimpleRpcClient(channel, "your.queue"); 
client.TimeoutMilliseconds = 10 * 1000; 
client.TimedOut += (sender, args) => { 
    // do something on timeout 
};      
var reply = client.Call(myMessage); // will return reply or null if timeout reached 

更简单的方法:使用基本Subscription类(它使用内部同样EventingBasicConsumer,但支持超时,所以你不必自己实现),像这样:

var sub = new Subscription(channel, "your.queue"); 
BasicDeliverEventArgs reply; 
if (!sub.Next(10 * 1000, out reply)) { 
    // timeout 
} 
+0

第一个解决方案无效。 BasicConsume不能保证在BasicCancel上停止使用,它可以稍后做这个,因为它实际上是在执行rabbit(只是尝试每个请求使用一条消息,并且在某些情况下,您会看到几次分配messageBody)。您仍然需要重发多余的消息。对于第二个和第三个,我会尝试现在=) – eocron

+0

虽然你的上半部分是无关的,订阅类正是我想要的!谢谢,它的作品奇妙!你能编辑答案,以便其他人知道最后一个解决了吗? – eocron

+0

不过,它通过实现存储了一堆消息,而我只需要一个=/ – eocron