2015-08-27 60 views
0

我使用的是amqp.node,我想建立一个系统,每秒从队列中检索1条消息,而不管队列中有多少个。我想检查队列大小,然后我从渠道get虽然。我怎么做?rabbitmq当前的队列大小

consumer.js

#!/usr/bin/env node 

var amqp = require('amqplib/callback_api') 
amqp.connect('<my_rabbitmq_host_info>', function(err, conn){ 
    conn.createChannel(function(err, ch){ 
    var q = 'message-queue' 
    ch.assertQueue(q, {durable: false}, function(err, queue){ 
     console.log(' [*] waiting for messages in queue: %s -- to exit press ctrl+c', q) 

     setInterval(function(){ 
     getMessage(ch, q, queue) 
     }, 1000) 
    }) 
    }) 
}) 

function getMessage(ch, q, queue){ 
    if(!queue){ 
    return 
    } 

    console.log('queue %s has %d messages in it...', q, queue.messageCount) 
    if(queue.messageCount > 0){ 
    console.log('getting 1 message from queue') 
    ch.get(q, {noAck: false}, function(err, msg){ 
     console.log(' [x] message recieved: %s \n\n', msg.content.toString()) 
     ch.ack(msg) 
    }) 
    } 
} 

每次我得到相同数量的queue.messageCount我以为因为我每次发送相同queue实例的功能。我如何获得目前的queue.messageCount

回答

1

您必须在每次想要获得当前尺寸时拨打assertQueue。反转你的代码,这样你就可以每隔一次间隔超时而不是只调用一次。


setInterval(function(){ 

    ch.assertQueue(q, {durable: false}, function(err, queue){ 
    console.log(' [*] waiting for messages in queue: %s -- to exit press ctrl+c', q) 
    getMessage(ch, q, queue) 
    }); 

}, 1000) 

也 - 这对我来说似乎是一个坏主意。我会建议一种不同的方法。为消费者设置the prefetch limit为1。那么你的代码一次只能提取一条消息。

也,这可能帮助:http://dougbarth.github.io/2011/06/10/keeping-the-rabbit-on-a-leash.html - 不同的语言,但其概念应该翻译成节点

+0

感谢德里克。我能够实现令牌桶并在示例应用程序中使用预取,就像这样。工作完美! – rkstar