2017-09-06 28 views
0

每当我发布新消息时,它都会创建一个新连接。我想只有一个连接和一个通道用于所有公共电话。对于节点应用程序只需要一个RbbitMQ连接,而不是每个发布调用

从rabbitmq网站阅读: 某些应用程序需要多个连接到AMQP代理。但是,不希望同时打开许多TCP连接,因为这样做会占用系统资源并使配置防火墙变得更加困难。 AMQP 0-9-1连接与可被认为是“共享单个TCP连接的轻量级连接”的通道复用。

但是如何? 这里是我的代码:

Channel.js

var amqp = require('amqplib/callback_api'); 

var url = process.env.AMQP_URL || 'amqp://guest:[email protected]:5672'; 

module.exports = createQueueChannel; 

function createQueueChannel(queue, cb) { 
    console.log("connecting................"); 
    amqp.connect(url, onceConnected); 

    function onceConnected(err, conn) { 
    if (err) { 
     console.error('Error connecting:', err.stack); 
    } 
    else { 
     console.log('connected'); 
     conn.createChannel(onceChannelCreated); 
    } 

    function onceChannelCreated(err, channel) { 
     if (err) { 
     cb(err); 
     } 
     else { 
     channel.assertQueue(queue, {durable: true}, onceQueueCreated); 
     } 

     function onceQueueCreated(err) { 
     if (err) { 
      cb(err); 
     } 
     else { 
      cb(null, channel, conn); 
     } 
     } 
    } 
    } 

} 

Publish.js

var Channel = require('./channel'); 

var queue = 'queue'; 

Channel(queue, function(err, channel, conn) { 
    if (err) { 
    console.error(err.stack); 
    } 
    else { 
    console.log('channel and queue created'); 
    var work = 'Do some work'; 
    channel.sendToQueue(queue, encode(work), { 
     persistent: true 
    }); 
    // setImmediate(function() { 
    // channel.close(); 
    // conn.close(); 
    // }); 
    } 
}); 


function encode(doc) { 
    return new Buffer(JSON.stringify(doc)); 
} 

回答

0

定义您的发布功能外连接(amqpConn)和发行渠道(pubChannel),并使用该频道,当你发布消息。

我建议你看一看完整的示例代码浏览:https://gist.github.com/carlhoerberg/006b01ac17a0a94859bahttps://www.cloudamqp.com/blog/2015-05-19-part2-2-rabbitmq-for-beginners_example-and-sample-code-node-js.html) 哪里还脱机队列中情况下使用的连接被中断了一段时间。

并且当连接时,您启动发布者。

function whenConnected() { 
    startPublisher() 
} 


var pubChannel = null; 
var offlinePubQueue = []; 
function startPublisher() { 
    amqpConn.createConfirmChannel(function(err, ch) { 
    if (closeOnErr(err)) return; 
    ch.on("error", function(err) { 
    console.error("[AMQP] channel error", err.message); 
    }); 
    ch.on("close", function() { 
    console.log("[AMQP] channel closed"); 
    }); 

    pubChannel = ch; 
    while (true) { 
    var m = offlinePubQueue.shift(); 
    if (!m) break; 
    publish(m[0], m[1], m[2]); 
    } 
}); 

}

和发布功能,如:

function publish(exchange, routingKey, content) { 
    try { 
    pubChannel.publish(exchange, routingKey, content, { persistent: true }, 
     function(err, ok) { 
     if (err) { 
      console.error("[AMQP] publish", err); 
      offlinePubQueue.push([exchange, routingKey, content]); 
      pubChannel.connection.close(); 
     } 
     } 
    ); 
    } catch (e) { 
    console.error("[AMQP] publish", e.message); 
    offlinePubQueue.push([exchange, routingKey, content]); 
    } 
} 
相关问题