2
我想明白为什么当使用amqp.node(https://github.com/squaremo/amqp.node),似乎我排队之前,我已经排队或处理这些消息的所有消息。我希望发送和接收是交错的。异步发送和收听与amqp.node
发送 - 接收 - 发送 - 接收
,但我得到
发送 - 发送 - 接收器接收
在考察我的过程中我看到,我会第一时间通过所有的前发我得到任何收到。我已经写了我有
var amqp = require('amqplib');
var prettyHrtime = require('pretty-hrtime');
var when = require('when');
var firstMesssage = false;
var time, diff, totaltime;
var received = 0;
amqp.connect('amqp://localhost', { noDelay: false }).then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var ok = ch.assertQueue('hello', {durable: false});
ok = ok.then(function(_qok) {
return ch.consume('hello', function(msg) {
if(!firstMesssage){
time = process.hrtime();
firstMesssage = true;
}
received++;
console.log(new Date().getTime());
console.log(" [x] Received '%s' %s", msg.content.toString(), received);
if(received >= 10000){
diff = process.hrtime(time);
var words = prettyHrtime(diff);
console.log(words);
}
}, {noAck: true});
});
return ok.then(function(_consumeOk) {
console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
});
}).then(null, console.warn);
var numberToSend = 10000;
var numberDone = 0;
amqp.connect('amqp://localhost', { noDelay: false }).then(function(conn) {
return when(conn.createChannel().then(function(ch) {
var q = 'hello';
var msg = 'Hello World!';
var ok = ch.assertQueue(q, {durable: false});
function sendMsg(){
ch.sendToQueue(q, new Buffer(msg));
console.log(" [x] Sent '%s'", msg, new Date().getTime());
numberDone++;
}
return ok.then(function(_qok) {
while(numberDone < numberToSend){
sendMsg();
}
console.log('*** done');
return ch.close();
});
})).ensure(function() { conn.close(); });
}).then(null, console.warn);
答案是完全正确的,但问题的例子是有点做作。典型的使用node和rabbitmq进行设置的过程与生产者和消费者具有不同的过程,所以交织发布和订阅的“线程阻塞”问题并不常见。 – 2015-04-09 09:47:55