2014-06-18 38 views
3

我正在使用Spring集成Kafka扩展来读取和处理Java应用程序中Kafka的消息。据我所知,它使用高级消费者API,不允许完全管理Zookeeper中的偏移量。春季集成Kafka和管理偏移量

在我的情况下,我们有auto.commit.enable = false为了在处理消息后向Zookeeper提交偏移量。如果处理失败,偏移将不会被提交,我们应该尝试在一些配置的时间内再次处理同样的消息,从Zookeeper偏移量开始。但它不起作用,因为我猜想,Apache Kafka客户端在内存中保持偏移。

我发现,kafka.consumer.ConsumerIterator处理偏移,如果consumedOffset它比动物园管理员更大然后它会使用消耗一个读消息。

所以,我想知道是否有任何方法可以重置Kafka客户端中的偏移量以开始读取Zookeeper中的偏移量?

回答

1

你不介意分享你的<int-kafka>配置,并指出你在哪里有问题?

也许这样做就足够了MessageLeftOverTracker.clearMessagesLeftOver()

我不喜欢Kafka,但知道Spring集成的功能。