2016-08-23 45 views
1

基于spring-kafka的文档,我使用基于Annotation的@KafkaListener来配置我的客户。Spring Kafka倾听所有主题并调整分区偏移

我看到的是 -

  1. 除非我指定的偏移量为零,同比开始,卡夫卡消费者拿起未来的消息,而不是现有的。 (我明白这是一个预期的结果,因为我没有指定偏移量,我想要的)

  2. 我在文档中看到一个选项来指定一个主题+分区组合,以及零偏移,但如果我这样做 - 我必须明确指定我希望消费者倾听哪个主题。

使用方法2以上,这是我的消费者现在的样子 -

@KafkaListener(id = "{group.id}", 
     topicPartitions = { 
       @TopicPartition(topic = "${kafka.topic.name}", 
         partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")) 
     }, 
     containerFactory = "kafkaListenerContainerFactory") 
public void listen(@Payload String payload, 
        Acknowledgment ack) throws InterruptedException, IOException { 

    logger.debug("This is what we received in the Kafka Consumer = " + payload); 

    idService.process(payload); 

    ack.acknowledge(); 
} 

虽然我明白,有指定“topicPattern”百搭卡或“主题”列表中的一个选项作为注释配置的一部分,我没有看到一个地方可以提供偏移值,从零开始列出主题/主题模式。有没有办法做到两者结合?请指教。

回答

3

当使用主题和topicPatterns(而不是显式声明分区)时,Kafka决定哪个消费者实例将得到哪些分区。

Kafka将分配分区,并且初始偏移量将是针对该组ID最后提交的。您目前无法更改该抵消额,但我们正在考虑添加一个seek function

如果你总是希望在开始第一个可用的偏移,使用一个唯一的组ID(如UUID.randomUUID().toString()),并设置

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

因为卡夫卡将没有现有的该组ID,将使用该属性偏移确定从哪里开始。

您也可以使用MANUAL ack模式并且从不ack,这将有效地执行相同的操作。

+0

topicPattern支持使用通配符?就像我提供topicPattern =“test *”一样。我的消费者是否应该从test1,test2,test3主题收听? – Satya

+1

该模式是一个正则表达式(正则表达式) - 使用'test。*'。 - 'test *'表示'tes','test','testt' --' tes',接着是0或更多't's。 –

相关问题