2012-11-28 121 views
2

我正在处理大容量数据流〜每秒500个信息,数据在使用带有10个并发使用者的SimpleMessageListenerContainer的Spring AMQP + Rabbit中消耗掉,我必须每15分钟对Db进行一次检查分钟并重新加载某些属性进行处理,这是通过一个石英触发器完成的,该触发器每15分钟启动一次,停止SimplelistenerContainer,完成必要的工作并再次启动容器。Spring AMQP - 重复消息

当应用程序启动时,当触发器触发并且容器重新启动时,我看到相同的消息被多次传递,这会导致很多重复。消费者没有任何放弃。

消息监听器

class RoundRobinQueueListener implements MessageListener { 

@Override 
public void onMessage(Message message) { //do processing 
    } 

    } 

在应用程序启动时设置平行消费者开始消费

final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory); 
     RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance(); 
     messageListenerContainer.setQueueNames(queueName); 
     messageListenerContainer.setMessageListener(roundRobinListener); 
     messageListenerContainer.setConcurrentConsumers(10); 
     messageListenerContainer.setChannelTransacted(true); 

石英触发

void execute(JobExecutionContext context) throws JobExecutionException { 
    messageListenerContainer.stop() 
    //Do db task, other processing 
    messageListenerContainer.start() 
    } 
+0

嗨winash,我们将需要看到一些代码和配置来帮助这个。相关的Spring配置将是一个很好的开始,以及Quartz作业代码。 –

+0

编辑原始问题添加代码示例,这或多或少是如何构造代码 – winash

+0

经过一些调试发现该错误是与我的流初始化,我错误地责怪它的兔子,感谢您的帮助 – winash

回答

0

看起来您的消息现在已被消费者确认。如果您未使用自动确认模式,则需要自行确认该消息(也可以在SimpleMessageListenerContainer配置此消息)。否则,代理会假定邮件未成功处理并尝试再次发送邮件。

+0

我已经设置通道是事务性的,我也没有明确设置任何确认模式(默认是auto)。 它确实看起来像一个确认问题,但队列永远不会建立,即消息确实得到处理。 – winash

+0

交易是否正确提交?在rmq中启用调试日志记录,并检查消息是否出列。 – mbelow

+0

现在就试试这个吧 – winash