2017-07-09 63 views
0

我正在使用Kafka流并希望重置一些消费者偏移量从Java到开始。 KafkaConsumer.seekToBeginning(...)听起来像做正确的事情,但我与卡夫卡流工作:从卡夫卡流重置消费者偏移量

KafkaStreams streams = new KafkaStreams(builder, props); 
... 
streams.start(); 

我想这取决于具体的流管线我确定这将在引擎盖下创建了几位消费者。我可以访问这些吗?还是有其他的方式来重置偏置编程?

回答

1

由于您使用的是Kafka Streams,您不仅需要重置消费者偏移量,还需要重置Streams内部状态存储区。

幸运的是,有一个与Kafka一起提供的Streams Application Reset Tool。

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool

+1

这是我已经知道了,见上面......这里的关键词是编程。 – micgn

+0

那么重置工具是一个程序,它是开源的,所以答案是肯定的,你可以以编程方式重置消费者偏移量和流状态存储。 –

0

大厦汉斯Jespersens答案,我成功地使用这个代码来执行脚本执行的Java代码是什么:

import kafka.tools.StreamsResetter; 

StreamsResetter resetter = new StreamsResetter(); 
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER}; 
resetter.run(args); 

类是我进口卡夫卡核心库的一部分在使用maven:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.12</artifactId> 
     <version>${kafka.version}</version> 
    </dependency>