2013-04-30 256 views
2

我想实现一个简单的生产者 - >卡夫卡 - >消费者应用程序在Java中。我能够成功生成并消费消息,但是当我重新启动消费者时会出现问题,其中一些已消费的消息再次被消费者从Kafka中获取(并非所有消息,但是最后一些消息消耗的消息)。简单的卡夫卡消费者消息传递重复

我在我的消费者中设置了autooffset.reset=largest,我的autocommit.interval.ms属性设置为1000毫秒。

这是'一些已经消耗的消息的重新传递'的一个已知问题,或者是否有任何其他设置,我在这里失踪?

基本上,有没有一种方法可以确保以前消费的消息都不会被消费者拿走/消费?

回答

3

卡夫卡使用Zookeeper存储消费者偏移量。由于Zookeeper操作非常缓慢,建议在消费每条消息之后提交偏移量。

可以向使用者添加关闭挂钩,以便在退出前手动提交主题偏移量。但是,这在某些情况下不会起作用(如jvm crash或kill -9)。为了防止这种情况,我建议实现自定义提交逻辑,在处理每个消息(文件或本地数据库)后在本地提交偏移量,并且每1000毫秒向Zookeeper提交偏移量。在消费者启动时,应查询这两个位置,并且应将最多两个值用作消费抵消。

+0

它可能听起来很愚蠢,但如果我们说例如实现自定义提交逻辑,那么是否可以管理每个消息的偏移量。例如,如果我有两个带有时间戳值的消息,那么我想根据时间戳设置偏移量。因此,如果第二条记录具有较早的时间戳,则分配给它的偏移量应该小于另一个。所以,当我消费时,我会收到已经排序的消息。 – user2720864 2013-09-09 06:35:48