2015-04-01 24 views
2

我想明白为什么当使用amqp.node(https://github.com/squaremo/amqp.node),似乎我排队之前,我已经排队或处理这些消息的所有消息。我希望发送和接收是交错的。异步发送和收听与amqp.node

发送 - 接收 - 发送 - 接收

,但我得到

发送 - 发送 - 接收器接收

在考察我的过程中我看到,我会第一时间通过所有的前发我得到任何收到。我已经写了我有

var amqp = require('amqplib'); 
var prettyHrtime = require('pretty-hrtime'); 
var when = require('when'); 

var firstMesssage = false; 
var time, diff, totaltime; 
var received = 0; 

amqp.connect('amqp://localhost', { noDelay: false }).then(function(conn) { 
    process.once('SIGINT', function() { conn.close(); }); 
    return conn.createChannel().then(function(ch) { 

    var ok = ch.assertQueue('hello', {durable: false}); 

    ok = ok.then(function(_qok) { 
     return ch.consume('hello', function(msg) { 

     if(!firstMesssage){ 
      time = process.hrtime(); 
      firstMesssage = true; 
     } 

     received++; 
     console.log(new Date().getTime()); 
     console.log(" [x] Received '%s' %s", msg.content.toString(), received); 
     if(received >= 10000){ 
      diff = process.hrtime(time); 
      var words = prettyHrtime(diff); 
      console.log(words); 
     } 
     }, {noAck: true}); 
    }); 

    return ok.then(function(_consumeOk) { 
     console.log(' [*] Waiting for messages. To exit press CTRL+C'); 
    }); 
    }); 
}).then(null, console.warn); 



var numberToSend = 10000; 
var numberDone = 0; 

amqp.connect('amqp://localhost', { noDelay: false }).then(function(conn) { 
    return when(conn.createChannel().then(function(ch) { 
    var q = 'hello'; 
    var msg = 'Hello World!'; 

    var ok = ch.assertQueue(q, {durable: false}); 

    function sendMsg(){ 
     ch.sendToQueue(q, new Buffer(msg)); 
     console.log(" [x] Sent '%s'", msg, new Date().getTime()); 
     numberDone++; 
    } 

    return ok.then(function(_qok) { 
     while(numberDone < numberToSend){ 
     sendMsg(); 
     } 
     console.log('*** done'); 
     return ch.close(); 

    }); 
    })).ensure(function() { conn.close(); }); 
}).then(null, console.warn); 

回答

1

节点问题的一个简单的演示是单线程的,因此无法过程中的任何将到来的消息,直到你while循环,它所有的发送已完成。

尝试包装你sendMsgsetImmediate电话,看看如何改变的事情(它必须被转换成一个承诺和链接,以便通道和连接不关闭一切发送之前)

function sendMsg(resolve){ 
    setImmediate(function() { 
    ch.sendToQueue(q, new Buffer(msg)); 
    console.log(" [x] Sent '%s'", msg, new Date().getTime()); 
    resolve(); 
    }); 
} 

return ok.then(function(_qok) { 
    var sendOk = when.promise(sendMsg); 
    while(numberDone < numberToSend){ 
    sendOk = sendOk.then(function() { 
     return when.promise(sendMsg); 
    }); 
    numberDone++; 
    } 

    return sendOk.finally(function() { 
    console.log('*** done'); 
    ch.close(); 
    }); 
}); 

你可以看到差异,哪怕只是5消息:

[x] Sent 'Hello World!' 1428467734962 
[*] Waiting for messages. To exit press CTRL+C 
[x] Sent 'Hello World!' 1428467734963 
1428467734965 
[x] Received 'Hello World!' 1 
[x] Sent 'Hello World!' 1428467734965 
[x] Sent 'Hello World!' 1428467734965 
[x] Sent 'Hello World!' 1428467734966 
[x] Sent 'Hello World!' 1428467734966 
*** done 
1428467735004 
[x] Received 'Hello World!' 2 
1428467735005 
[x] Received 'Hello World!' 3 
1428467735005 
[x] Received 'Hello World!' 4 
1428467735005 
[x] Received 'Hello World!' 5 
+0

答案是完全正确的,但问题的例子是有点做作。典型的使用node和rabbitmq进行设置的过程与生产者和消费者具有不同的过程,所以交织发布和订阅的“线程阻塞”问题并不常见。 – 2015-04-09 09:47:55