2011-10-14 15 views
4

我有一个简单生产者/消费者AMQP设置是这样的:是否有用于提交/回滚消息处理的amqp体系结构的设计模式?

producer -> e1:jobs_queue -> consumer -> e2:results_queue -> result_handler 

生产者一定数量的就业岗位的发送。消费者一次一个地取下工作,并对其进行处理,并将结果推送到另一个队列中。然后由result_handler将结果发布到数据库。

有时消费者失败 - 它可能被操作系统杀死或抛出异常。如果在处理消息时发生这种情况,则此消息丢失,不会产生相应的结果,我很难过。如果失败的工作重新排队,我会很高兴。

什么我要找的是确保一个设计模式,要么消费者处理该作业完成,并提出相应的结果为* results_queue *,或者如果它失败作业放回* jobs_queue * 。由于消费者是什么故障,消费者不应负责管理任何与其自身监督有关的消息。

我们知道,消费者未能处理作业,如果:

  • 从* job_queue *找了一份工作,有些超时
  • 后没有结果已经生产出来了工作,从* job_queue *然后死亡

对于我的应用程序,我们可能通过简单地等待处理作业超时来捕获第二个案例。在生产中,将有许多工人来监督,所有工作都从共同的工作清单中提取出来,并将结果放入一个结果交换/队列中。

+0

你是否自动确认邮件?大概你可以关闭自动确认,然后在成功处理作业后确认,从而让兔子处理重新发送的消息。 –

+0

可靠消息传递,持久消息队列,确认传递,两阶段确认(接收和保存/转发)以及其他几种模式有几种模式。对于您的平台和需求的详细了解,这不是一个简单的解决方案。 –

回答

2

实现您想要的最简单的方法是手动处理收到的消息的确认。在node-amqp这很简单,只需将选项{ ack: true }添加到queue.subscribe调用。然后你可以通过调用队列上的某个函数来确认消息。在node-amqp的情况下,它是queue.shift()

您还可以使用prefetchCount来设置消费者被允许的尚未确认消息的数量。

如果消费者断开连接,任何未确认的消息现在都会重新发送给任何连接的消费者。

通过将队列设置为durableautoDelete: false,还可以确保在重新启动MQ服务器或断开最后一位使用者时,队列(及其上的消息)不会被删除。

相关问题