2016-03-22 152 views
2

我使用KafkaConsumer消耗从卡夫卡服务器(主题)的消息..卡夫卡消费者动态检测主题添加

  • 它工作正常的消费者开始编码之前创建的主题...

但问题是,如果主题创建动态(我的意思是说消费者代码开始后)说,它将无法正常工作,但API说它将支持动态主题创建..这里是链接供您参考..

使用的卡夫卡版本:0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

这里是JAVA代码...

Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "false"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    Pattern r = Pattern.compile("siddu(\\d)*"); 

    consumer.subscribe(r, new HandleRebalance()); 
    try { 
     while(true) { 
      ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
      for (TopicPartition partition : records.partitions()) { 
       List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); 
       for (ConsumerRecord<String, String> record : partitionRecords) { 
        System.out.println(partition.partition() + ": " +record.offset() + ": " + record.value()); 
       } 
       long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 

       consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); 
      } 
     } 
    } finally { 
     consumer.close(); 
    } 

注:我的主题名称相匹配的正则表达式。 如果我重新开始消费,然后就会开始读邮件推主题...

任何帮助,非常感谢......

回答

2

您可以挂接到动物园管理员。退房the sample code。实质上,您将在Zookeeper节点/brokers/topics上创建一个观察器。当这里添加新的孩子时,这是一个新的主题被添加,并且您的观察者将被触发。

+0

感谢您的回复和帮助...基本上我想使用KafkaConsumer API来实现这一点,我自己解决它.. – siddu

+0

这是如何解决的?我也有同样的问题。 – madlad

+0

@siddu你能告诉我你是如何解决这个问题的? – bhspencer

5

在apache kafka邮件存档中有一个答案。我下面将其复制:

消费者支持配置选项“metadata.max.age.ms” 基本上控制元数据多久的话题是牵强。默认设置为 ,设置为相当高(5分钟),这意味着需要 长达5分钟才能发现与常规 表达式匹配的新主题。您可以将此设置得更低以更快地发现主题。

所以在你的道具,您可以:

props.put("metadata.max.age.ms", 5000); 

这将导致你的消费者,以了解每5秒的新课题。