2015-11-12 40 views
6

我是卡夫卡的新手,试图了解是否有方法从上次消费的偏移量中读取消息,但不能从头开始。如何使卡夫卡消费者从上次消费的偏移量读取,但不是从开始

我在写一个案例,以便我的意图不会偏离。

Eg: 
1) I produced 5 messages at 7:00 PM and console consumer consumed those. 
2) I stopped consumer at 7:10 PM 
3) I produced 10 message at 7:20 PM. No consumer had read those messages. 
4) Now, i have started console consumer at 7:30 PM, without from-beginning. 
5) Now, it Will read the messages produced after it has started. Not the earlier ones, which were produced at 7.20 PM 

有没有办法从上一次消耗的偏移量中产生消息?

回答

4

我是新来的卡夫卡,并试图了解如果有一种方法来读取消息从最后消费的偏移量,但不是从开始。

是的,可以使用控制台使用者从上次消耗的偏移量中读取数据。您必须在调用kafka-console-consumer时添加consumer.config标志。

实施例: -

[[email protected] bin]# ./kafka-console-consumer.sh --topic test1 --zookeeper localhost:2181 --consumer.config /home/mrnakumar/consumer.properties 

这里/home/mrnakumar/consumer.properties是含有group.id的文件。 这里是/home/mrnakumar/consumer.properties的样子: -

group.id = consoleGroup

Withoug使用consumer.config,可无论是从开始[被阅读使用 - from-beginning]或仅限Log的结尾。日志结束意味着消费者启动后发布的所有消息。

+1

是的,如果我们给任何组ID,然后将数据从得到的最后消耗点读。如果我们在没有组ID的情况下运行,它仅在开始后才考虑数据。谢谢.. – Srini

+0

是否也可以通过控制台使用者为特定使用者组提交偏移量? – clausmc

3

您应该在largest的消费者配置中设置auto.offset.reset参数,以便它读取上次提交的偏移量之后的所有消息。

+0

@Srini您不必将此属性设置为数值,此属性的值应该是“最大的”,以便从流的末尾消耗。 – leshkin

+0

为我工作。谢谢! 我刚加入consumer.properties文件中的以下行: auto.offset.reset =最大 –