3
我正在使用Spring集成Kafka扩展来读取和处理Java应用程序中Kafka的消息。据我所知,它使用高级消费者API,不允许完全管理Zookeeper中的偏移量。春季集成Kafka和管理偏移量
在我的情况下,我们有auto.commit.enable = false为了在处理消息后向Zookeeper提交偏移量。如果处理失败,偏移将不会被提交,我们应该尝试在一些配置的时间内再次处理同样的消息,从Zookeeper偏移量开始。但它不起作用,因为我猜想,Apache Kafka客户端在内存中保持偏移。
我发现,kafka.consumer.ConsumerIterator处理偏移,如果consumedOffset它比动物园管理员更大然后它会使用消耗一个读消息。
所以,我想知道是否有任何方法可以重置Kafka客户端中的偏移量以开始读取Zookeeper中的偏移量?