2015-10-21 36 views
1

我正在使用spring-integration-kafka实施具有自定义确认机制的卡夫卡消费者。使用this example的代码。通过拒绝确认重新读取来自卡夫卡主题的消息

我想要实现的是当抛出异常时,确认不应该发回给Kafka(即不应该执行偏移提交),因此下一个fromKafka.receive(10000)方法调用将返回与先前相同的消息一。

但是我面临一个问题:即使确认没有发送到Kafka,消费者也知道下一个消息的偏移量,并继续读取新的消息,尽管偏移量主题中的偏移量仍然存在不变。

如何在发生某些故障时让消费者重读消息?

回答

1

有目前未重新获取失败消息的任何支持

一两件事你可以做的就是添加重试(例如,使用请求处理程序重试咨询)消息驱动适配器的下游。

通过不反应,消息将在重新启动后传递,但不会在当前实例化期间传递。因为消息被预取到适配器中,所以你可以做的一件事是检测失败,停止适配器,排出预取的消息并重新启动。

您可以注入一个自定义ErrorHandler来停止适配器,并向您的下游流发送信号,表明它应该忽略排水消息。

+0

谢谢,我会尝试'ErrorHandler'的方式。但无论如何,适配器如何知道下一条消息的偏移量?它是否在内部存储当前的偏移量? – Aliaxander