2015-04-22 33 views

回答

12

你可以在zookeeper shell的帮助下做到这一点。卡夫卡使用动物园管理员来追踪消费者偏移量。

转到卡夫卡bin目录,并调用饲养员壳。(我的卡夫卡的版本是0.8.0)

./zookeeper-shell.sh localhost:2181 

现在使用的动物园管理员get命令

get /consumers/consumer_group_id/offsets/topic/0 

它表明类似

2043 
cZxid = 0x4d 
ctime = Wed Mar 18 03:56:32 EDT 2015 
... 

这里2043是消耗的最大偏移量。通过使用动物园管理员将其设置为所希望的值设定命令

set /consumers/consumer_group_id/offsets/topic/0 10000 

路径尚未这样/消费者/ [consumer_group_id] /偏移/ [主题]/[partition_id时]。
您将不得不用适当的消费者组,主题和分区ID来替换。

*此外,既然您提到这是一个新的卡夫卡实例,我不确定消费者是否会建立连接并创建消费群。

+0

好的答案,这对我的项目真的有帮助! –

1

由于kafka 0.9偏移被存储在一个主题中。要改变偏移,使用seek() method

public void seek(TopicPartition partition, long offset) 

覆盖的获取,消费者将在下一个poll(timeout)使用偏移。如果多次为同一分区调用此API,则将在下一个poll()中使用最新的偏移量。请注意,如果此API在消费过程中被任意使用,您可能会丢失数据,以重置提取偏移量