有很多方法可以做到这一点。例如,您可以用ManualResetEvent
一起使用EventingBasicConsumer
,像这样的(这只是用于演示目的 - 更好地利用以下方法之一):
var factory = new ConnectionFactory();
using (var connection = factory.CreateConnection()) {
using (var channel = connection.CreateModel()) {
// setup signal
using (var signal = new ManualResetEvent(false)) {
var consumer = new EventingBasicConsumer(channel);
byte[] messageBody = null;
consumer.Received += (sender, args) => {
messageBody = args.Body;
// process your message or store for later
// set signal
signal.Set();
};
// start consuming
channel.BasicConsume("your.queue", false, consumer);
// wait until message is received or timeout reached
bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10));
// cancel subscription
channel.BasicCancel(consumer.ConsumerTag);
if (timeout) {
// timeout reached - do what you need in this case
throw new Exception("timeout");
}
// at this point messageBody is received
}
}
}
正如你在注释中规定 - 如果你希望在同一个队列多个消息,这不是最好的方法。那么这不是最好的方法,我只是为了演示ManualResetEvent
在案例库本身不提供超时支持的情况下使用它。
如果您正在进行RPC(远程过程调用,请求 - 答复) - 您可以在服务器端使用SimpleRpcClient
和SimpleRpcServer
。客户端看起来就像这样:
var client = new SimpleRpcClient(channel, "your.queue");
client.TimeoutMilliseconds = 10 * 1000;
client.TimedOut += (sender, args) => {
// do something on timeout
};
var reply = client.Call(myMessage); // will return reply or null if timeout reached
更简单的方法:使用基本Subscription
类(它使用内部同样EventingBasicConsumer
,但支持超时,所以你不必自己实现),像这样:
var sub = new Subscription(channel, "your.queue");
BasicDeliverEventArgs reply;
if (!sub.Next(10 * 1000, out reply)) {
// timeout
}
来源
2017-05-31 10:04:58
Evk
第一个解决方案无效。 BasicConsume不能保证在BasicCancel上停止使用,它可以稍后做这个,因为它实际上是在执行rabbit(只是尝试每个请求使用一条消息,并且在某些情况下,您会看到几次分配messageBody)。您仍然需要重发多余的消息。对于第二个和第三个,我会尝试现在=) – eocron
虽然你的上半部分是无关的,订阅类正是我想要的!谢谢,它的作品奇妙!你能编辑答案,以便其他人知道最后一个解决了吗? – eocron
不过,它通过实现存储了一堆消息,而我只需要一个=/ – eocron