我使用KafkaConsumer消耗从卡夫卡服务器(主题)的消息..卡夫卡消费者动态检测主题添加
- 它工作正常的消费者开始编码之前创建的主题...
但问题是,如果主题创建动态(我的意思是说消费者代码开始后)说,它将无法正常工作,但API说它将支持动态主题创建..这里是链接供您参考..
使用的卡夫卡版本:0.9.0.1
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
这里是JAVA代码...
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Pattern r = Pattern.compile("siddu(\\d)*");
consumer.subscribe(r, new HandleRebalance());
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(partition.partition() + ": " +record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
注:我的主题名称相匹配的正则表达式。 如果我重新开始消费,然后就会开始读邮件推主题...
任何帮助,非常感谢......
感谢您的回复和帮助...基本上我想使用KafkaConsumer API来实现这一点,我自己解决它.. – siddu
这是如何解决的?我也有同样的问题。 – madlad
@siddu你能告诉我你是如何解决这个问题的? – bhspencer