2012-06-28 40 views
1

是否有一种常见模式用于使0mq pub/sub中的发布者在节点中冗余?动机是能够与可能会失败/定期重新启动的发布者一起运行多个进程。如何在Node.js中创建一个ZeroMQ发布者冗余?

我最初的想法是在主创建一个转发器,并从工人出版商连接到它:

var cluster = require('cluster') 
    , zmq = require('zmq') 
    , endpointIn = 'ipc:///tmp/cluster_pub_sub' 
    , endpointOut = 'tcp://127.0.0.1:7777'; 

if (cluster.isMaster) { 
    for (var i = 0; i < 2; i++) cluster.fork(); 
    startPubSubForwarder(); 
} else { 
    startPublisher(); 
} 

function startPublisher() { 
    var socket = zmq.socket('pub'); 
    socket.connect(endpointIn); 
    setInterval(function() { 
    socket.send('pid=' + process.pid); 
    }, 1000); 
} 

function startPubSubForwarder() { 
    var sIn = zmq.socket('sub') 
    , sOut = zmq.socket('pub'); 

    // incoming 
    sIn.subscribe(''); 
    sIn.bind(endpointIn, function (err) { 
    if (err) throw err; 
    }); 
    sIn.on('message', function (data) { 
    sOut.send(data); 
    }); 

    // outgoing 
    sOut.bind(endpointOut, function (err) { 
    if (err) throw err; 
    }); 
} 

这样做的还有其他/更好的方法?

回答

0

如果您的担心是消息持久性,那么我认为您不必担心有多个发布者,更担心在发布者死后确保邮件不会丢失。您可以立即重新启动发布服务器,然后选择停止发布的位置。您还需要知道哪些消息已成功发送。

这需要的是1)持久性存储和2)以及向发布者确认消息在接收端被接收(并且可能处理完成)的手段。这个设置应该解决您的可靠性需求。

如果你还想完成高规模,那么你需要增加一点体系结构。对于发送方和接收方为1:1的发送/接收方案来说更直接,当您需要执行1:N轮循机制/负载分配方案时更复杂一些,这可能是您需要的扩展。

我上完成的升级场景输入是具有以下设置:

sender_process - (1:1) - >分配器 - (1:N) - > receiver_process(ES )

其中分发者确认收到来自发送者的消息,然后扇出到接收者进程。

您可能希望通过在每个进程前面放置一个队列来完成此操作。所以,你不会发送进程。您发送到进程读取的队列。发件人将东西放在分配器队列中。经销商将东西放在接收器的队列中。在每个点上,每个进程都试图进行处理。如果它失败了一些最大重试次数,它会进入一个错误队列。

我们使用rabbitmq/amqp来完成所有这些工作。我已经开始公开我们用来执行1:1和1:N发送的总线:https://github.com/mateodelnorte/servicebus。我将在接下来的几天内添加自述文件和更多测试。

+0

(我的问题肯定是关于你的第二个例子)。我担心“货运代理商/分销商”箱子正在倒塌。我怎样才能有两个或更多的这些“转发/分销”流程? HAProxy(或任何其他TCP代理)是唯一的方法吗?或者0mq中有什么可以执行负载平衡/故障转移? –

+0

您不希望每个消息类型使用多个分发服务器。您的一位经销商将从1个发件人发送邮件到N个收件人。 您应该只能使用0mq作为您的分销商的运输工具。您必须编写一些自定义代码来订阅一条消息并发送给N个侦听器,以及任何错误队列类型方案。 – MateodelNorte

+0

还有一件需要注意的事情: 由于您的邮件是持久性的,如果分销商发生故障,它会在邮件被保存在队列中时从其停止的地方开始发送。 – MateodelNorte

0

从您的示例代码中,我认为XPUB/XSUB 0MQ模式是您最合适的。这是实现同一个“startPubSubForwarder()”块的更有效的方式,而且您的订阅方可以直接在发布服务器后端订阅特定模式。在这里,我用publishers/xpub-xsub(以代理方式)/下标:https://github.com/krakatoa/node_zmq_workshop/tree/master/03.3_news_proxy的例子离开了一个链接。这是NodeJS代码(它是我的,不介意问细节!)。