我正在处理大容量数据流〜每秒500个信息,数据在使用带有10个并发使用者的SimpleMessageListenerContainer的Spring AMQP + Rabbit中消耗掉,我必须每15分钟对Db进行一次检查分钟并重新加载某些属性进行处理,这是通过一个石英触发器完成的,该触发器每15分钟启动一次,停止SimplelistenerContainer,完成必要的工作并再次启动容器。Spring AMQP - 重复消息
当应用程序启动时,当触发器触发并且容器重新启动时,我看到相同的消息被多次传递,这会导致很多重复。消费者没有任何放弃。
消息监听器
class RoundRobinQueueListener implements MessageListener {
@Override
public void onMessage(Message message) { //do processing
}
}
在应用程序启动时设置平行消费者开始消费
final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance();
messageListenerContainer.setQueueNames(queueName);
messageListenerContainer.setMessageListener(roundRobinListener);
messageListenerContainer.setConcurrentConsumers(10);
messageListenerContainer.setChannelTransacted(true);
石英触发
void execute(JobExecutionContext context) throws JobExecutionException {
messageListenerContainer.stop()
//Do db task, other processing
messageListenerContainer.start()
}
嗨winash,我们将需要看到一些代码和配置来帮助这个。相关的Spring配置将是一个很好的开始,以及Quartz作业代码。 –
编辑原始问题添加代码示例,这或多或少是如何构造代码 – winash
经过一些调试发现该错误是与我的流初始化,我错误地责怪它的兔子,感谢您的帮助 – winash