2017-04-14 33 views
5

有没有办法用Kafka Stream手动提交?如何使用Kafka Stream手动提交?

通常使用的KafkaConsumer,我这样做如下:

while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records){ 
     // process records 
    } 
    consumer.commitAsync(); 
} 

当我打电话手动提交。我没有看到KStream的类似API。

回答

12

提交由Streams在内部处理和完全自动处理,因此通常没有理由手动提交。请注意,Streams处理此操作的方式与使用者自动提交不同 - 事实上,自动提交会禁用内部使用的使用者,而Streams会“手动”管理提交。原因是,提交只能在处理过程中的某些点发生,以确保没有数据丢失(有关更新状态和刷新结果的许多内部依赖关系)。

对于更频繁的提交,可以通过StreamsConfig参数commit.interval.ms减少提交间隔。

尽管如此,通过低杠杆的Processor API可以间接地手动提交。您可以使用通过init()方法提供的context对象调用context#commit()。请注意,这只是一个“Streams请求”,尽快提交 - 它不直接发布提交。

相关问题