0
我想在语法上重新创建卡夫卡主题。 我使用相同的kafka.admin.AdminUtils
。重复卡夫卡主题
这里是我的rought代码:
AdminUtils.deleteTopic(zkUtils, topicName);
AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties());
上面的代码是工作的大部分时间,但多次失败与以下情况除外:
Exception in thread "main" kafka.common.TopicExistsException: Topic "new_topic" already exists.
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:253)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:237)
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
我的理解是,主题不正确删除。我在这里做错了什么。
我已经将delete.topic.enable设置为true。为了达到第2点,是否有任何来自kafka的回调,当删除操作成功完成时通知它? –
似乎没有开箱即用的回调功能可以使用。一种可能的方法是监视zk节点'/ admin/delete_topics/'的存在。删除这个zk节点是删除主题过程中的最后一步,在清除不容易被监控的控制器缓存之前发生该主题。 –
amethystic