2010-05-10 30 views
6

我想发送消息到RabbitMQ服务器,然后等待回复消息(在“回复”队列中)。当然,我不想永远等待处理这些消息的应用程序停机 - 这需要超时。这听起来像是一项非常基本的任务,但我找不到办法做到这一点。我现在遇到了这个问题py-amqplibRabbitMQ .NET client等待一个超时的单个RabbitMQ消息

到目前为止,我已经得到了最好的解决方案是使用basic_get在两者之间sleep轮询,但是这是很丑陋:

def _wait_for_message_with_timeout(channel, queue_name, timeout): 
    slept = 0 
    sleep_interval = 0.1 

    while slept < timeout: 
     reply = channel.basic_get(queue_name) 
     if reply is not None: 
      return reply 

     time.sleep(sleep_interval) 
     slept += sleep_interval 

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout) 

肯定有一些更好的办法?

回答

8

我刚刚在carrot中增加了对amqplib的超时支持。

这是amqplib.client0_8.Connection一个子类:

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multi能够接收关于信道的任意数量的 一个版本的channel.wait

我想这可能会合并上游在某些时候。

+1

现在,我称之为“伟大的答案”:“它是固定的”!接受 - 希望它*被合并到amqplib中。 – EMP 2010-05-10 23:08:27

+0

@EMP哈哈:)有趣:) – 2013-07-18 10:51:04

1

这似乎打破了异步处理的整个思路,但是如果你肯定我认为正确的方法是使用RpcClient

+0

虽然RpcClient本身对我来说并不实用,看着它的实现揭示了使用方法:创建一个'QueueingBasicConsumer'和在队列中等待,这支持超时。这在.NET中并不像我担心的那样复杂。 – EMP 2010-05-10 00:57:13

2

有一个例子here使用qpidmsg = q.get(timeout=1)应该做你想做的。对不起,我不知道其他AMQP客户端库实现超时(特别是我不知道你提到的两个具体的)。

+0

看看qpid的源代码,它似乎使用与.NET客户端完全相同的方法:'basic_consume'带有队列并在队列中等待超时。看起来这就是我必须要做的。 – EMP 2010-05-10 00:57:57

8

这里就是我终于实现了在.NET客户端:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs) 
{ 
    var consumer = new QueueingBasicConsumer(Channel); 
    var tag = Channel.BasicConsume(queueName, true, null, consumer); 
    try 
    { 
     object result; 
     if (!consumer.Queue.Dequeue(timeoutMs, out result)) 
      throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs/1000.0)); 

     return ((BasicDeliverEventArgs)result).Body; 
    } 
    finally 
    { 
     Channel.BasicCancel(tag); 
    } 
} 

不幸的是,我不能做同样的PY-amqplib,因为它basic_consume方法不调用回调,除非你打电话channel.wait()channel.wait()不支持超时!这个愚蠢的限制(我不断遇到)意味着如果你永远不会收到另一条消息,你的线程将永远冻结。

1

兔子现在允许您添加超时事件。简单地包裹你的代码在尝试捕捉,然后扔在超时异常和断开处理程序:

try{ 
    using (IModel channel = rabbitConnection.connection.CreateModel()) 
    { 
     client = new SimpleRpcClient(channel, "", "", queue); 
     client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity 
     client.TimedOut += RpcTimedOutHandler; 
     client.Disconnected += RpcDisconnectedHandler; 
     byte[] replyMessageBytes = client.Call(message); 
     return replyMessageBytes; 
    } 
} 
catch (Exception){ 
    //Handle timeout and disconnect here 
} 
private void RpcDisconnectedHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC disconnect exception occured."); 
} 

private void RpcTimedOutHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC timeout exception occured."); 
}