2017-10-10 181 views
0

我想让我的消费者动态更新其消费。如何通知消费者在Kafka中已创建新主题?

让我给你一个更具体的例子,使用动物。想象一下,我有一家宠物商店,每个话题都是一种动物(如狗,猫,鱼)。我的卡夫卡消费者的主要责任是抓住我们在卡夫卡的任何日志/记录/消息并将它们存储到数据库中。

假设我的消费者正在主动消费dogscats主题并且一切正常,现在有一种新型动物进入商店并在卡夫卡群集中生成新主题。我如何通知我的消费者已添加新主题?

我有两个建议,我想看看哪一个你认为更好?或者如果有更好的第三种选择,请让我知道。

1.)生产者发送一个http请求给消费者,让消费者知道生产者即将创建一个新的主题,以便消费者可以相应地采取行动。这种方法的问题在于,存在竞争条件。在主题创建之前,消费者有可能尝试消费。 (我刚刚发现,如果我将auto.topic.creation.enable设置为true,竞态条件并不是真正的问题。)

2.)在Kafka集群中创建一个名为topic_updates的额外主题。所以每当生产者成功地向Kafka集群发送消息时,它就通过这个topic_updates广播消息,也许一个简单的字符串就可以做到。消费者正在积极倾听这个话题的更新。

3.)我不知道,理想情况下,我希望卡夫卡可以在创建新主题时发出一个事件。

预先感谢您

回答

1

消费者能够自动找到新创建的主题,你可以简单地通过调用consumer.subscribe(Pattern.compile(".*"));

订阅的所有主题可能会降低下来metadata.max.age.ms有消费者知道新的话题更快。

0

您可以使用新的KafkaAdminClient并以某种方式监视主题列表并检查新增加的内容。以下是一个示例代码,其中提供了主题列表(不包括内部主题):

Properties properties = new Properties(); 
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties); 
ListTopicsResult listTopicResult = kafkaAdminClient.listTopics(); 
System.out.println(listTopicResult.names().get().toString()); 
相关问题