1
我正在使用spring-integration-kafka
实施具有自定义确认机制的卡夫卡消费者。使用this example的代码。通过拒绝确认重新读取来自卡夫卡主题的消息
我想要实现的是当抛出异常时,确认不应该发回给Kafka(即不应该执行偏移提交),因此下一个fromKafka.receive(10000)
方法调用将返回与先前相同的消息一。
但是我面临一个问题:即使确认没有发送到Kafka,消费者也知道下一个消息的偏移量,并继续读取新的消息,尽管偏移量主题中的偏移量仍然存在不变。
如何在发生某些故障时让消费者重读消息?
谢谢,我会尝试'ErrorHandler'的方式。但无论如何,适配器如何知道下一条消息的偏移量?它是否在内部存储当前的偏移量? – Aliaxander