2017-01-18 39 views
0

我使用Flume 1.7 Kafka source将数据从Apache Kafka中提取到我的AbstractSink中。Flume 1.7&Kafka - 如何在主题开始时重新启动?

:在过去,我可以通过删除使用 ./kafka-consumer-groups.sh --delete但由于水槽1.7(显然)采用的是“新”的消费主题偏移,试图 ./kafka-consumer-groups.sh --delete现在提供了以下错误消息重新启动在主题开始时的偏移

选项[删除]对[新消费者]无效。需要注意的是有没有 必要删除组的元数据新的消费,因为它是 当最后一个成员离开

所以自动删除,什么是实现所需的行为(这是我们将重新推荐的方法 - 从主题开始处理数据)?

这里是我的水槽配置的一部分:

myagent.sources.my-kafka-source.type = org.apache.flume.source.kafka.KafkaSource 
myagent.sources.my-kafka-source.kafka.bootstrap.servers = kafka.example.net:9092 
myagent.sources.my-kafka-source.kafka.consumer.group.id = my-gid 
myagent.sources.my-kafka-source.kafka.topics = my.topic 
myagent.sources.my-kafka-source.kafka.auto.offset.reset = earliest 
myagent.sources.my-kafka-source.channels = my_channel 

回答

0

水槽不提供倒带功能虽然卡夫卡不附带KafkaConsumer#寻求让您重新消耗的消息的直接支持。似乎你必须使用新的组ID来做到这一点,需要重新启动Flume代理。

相关问题