2016-11-18 154 views
0

我想在语法上重新创建卡夫卡主题。 我使用相同的kafka.admin.AdminUtils重复卡夫卡主题

这里是我的rought代码:

AdminUtils.deleteTopic(zkUtils, topicName); 
AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties()); 

上面的代码是工作的大部分时间,但多次失败与以下情况除外:

Exception in thread "main" kafka.common.TopicExistsException: Topic "new_topic" already exists. 
     at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:253) 
     at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:237) 
     at kafka.admin.AdminUtils.createTopic(AdminUtils.scala) 

我的理解是,主题不正确删除。我在这里做错了什么。

回答

2

在调用deletedTopic之后,您不能调用createTopic。有两件事情必须照顾:

  1. 设置“delete.topic.enable”真

  2. 由于删除主题异步操作,你最好确保所有元数据已被成功删除在创建新话题之前从Zookeeper中删除

+0

我已经将delete.topic.enable设置为true。为了达到第2点,是否有任何来自kafka的回调,当删除操作成功完成时通知它? –

+0

似乎没有开箱即用的回调功能可以使用。一种可能的方法是监视zk节点'/ admin/delete_topics/'的存在。删除这个zk节点是删除主题过程中的最后一步,在清除不容易被监控的控制器缓存之前发生该主题。 – amethystic