2016-07-15 21 views
1

我试图使用OffsetCommitRequest其中包含在以下包卡夫卡消费者API 0.9版本: org.apache.kafka.common.requests.OffsetCommitRequest如何向卡夫卡的消费者发送OffsetCommitRequest?

如何发送这个要求吗?什么是使用此的理想方式? 我想在卡夫卡本身提交补偿。我没有找到任何与0.9版相关的文档。它大部分可用于0.8.x

此外,此请求的构造函数需要生成id,成员id和保留时间。这些领域是什么?

+0

通过这个链接https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka – Aby

+0

我已经做到了。这是为旧版本。它使用BlockingChannel发送0.9 – vamosromil

+0

中不可用的请求。0.9版的Kafka文档可以在这里找到:http://kafka.apache.org/090/documentation.html – Aby

回答

3

如果要提交手动偏移,也许你应该设置消费属性

enable.auto.commit=false

,并使用commitSync()或commitAsync()卡夫卡消费者的方法。 例如,您可以在处理完所有ConsumerRecords后调用commitSync()。 或者你也可以只提交你想要的每个接收到的ConsumerRecord之后的TopicPartition。就像这样:

Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(); 
offsetMap.put(new TopicPartition(someTopic, somePartition), new OffsetAndMetadata(someOffset)); 
kafkaConsumer.commitSync(offsetMap); 
+0

这是我采取的最终解决方案。谢谢反正:) 但我仍然想回答我的原始问题。我认为这个请求对象有时可以非常方便。 – vamosromil