我想实现一个简单的生产者 - >卡夫卡 - >消费者应用程序在Java中。我能够成功生成并消费消息,但是当我重新启动消费者时会出现问题,其中一些已消费的消息再次被消费者从Kafka中获取(并非所有消息,但是最后一些消息消耗的消息)。简单的卡夫卡消费者消息传递重复
我在我的消费者中设置了autooffset.reset=largest
,我的autocommit.interval.ms
属性设置为1000毫秒。
这是'一些已经消耗的消息的重新传递'的一个已知问题,或者是否有任何其他设置,我在这里失踪?
基本上,有没有一种方法可以确保以前消费的消息都不会被消费者拿走/消费?
它可能听起来很愚蠢,但如果我们说例如实现自定义提交逻辑,那么是否可以管理每个消息的偏移量。例如,如果我有两个带有时间戳值的消息,那么我想根据时间戳设置偏移量。因此,如果第二条记录具有较早的时间戳,则分配给它的偏移量应该小于另一个。所以,当我消费时,我会收到已经排序的消息。 – user2720864 2013-09-09 06:35:48