2017-01-18 42 views
0

我有一个REST服务,可以称之为MDD,它有一个卡夫卡消费者。当我第一次启动其他服务时,另一项服务告诉MDD的消费者订阅特定主题,一切似乎都很顺利。一个消费者多个主题导致潜在的线程问题?

然后该服务告诉MDD的消费者订阅另一个主题。我现在正在做的方式是通过consumer.assign()方法。基本上,如果一个新的话题被引入到消费者没有被分配到的地方,我将这个新话题分配给消费者。所以现在一个消费者被分配到两个不同的主题。

这位消费者调查消息并将它们存入HDFS。

现在我注意到了,当第二个主题的订阅进来时,有时我得到了无法附加到HDFS文件的错误,当我查看日志时,它试图追加一些数据不应该被追加到以后。 例如,kafka的数据按照A,B,C的顺序出现。当MDD完成后,将HD添加到HDFS,它试图追加C(而不是B),并同时尝试追加B.另外还有一点需要注意的是,目前没有数据来自第一个主题,只有来自第二个主题的数据正在流入。因此,目前只有一个kafka主题在任何给定时间都有数据流入。

任何人有任何想法可能会发生什么?将一位消费者分配给多个主题时,是否可能创建一些线索问题?因为当消费者被分配到一个主题时一切似乎都很顺利,但只要分配给多个主题,我就无法追加到HDFS中的文件,因为其他一些作者已经拥有租约。这个错误不会经常发生,只是非常随机。

此外,每次创建新主题时都会推荐修复,创建一个新的卡夫卡消费者?

回答

0

绝对有效且可以让一位消费者阅读来自多个主题的消息。您遇到的问题是因为Kafka目前不支持使用手动分区分配(使用KafkaConsumer#assign)和组管理(使用KafkaConsumer#subscribe)。

要支持订阅新创建的主题,您可以尝试调用传递正则表达式的KafkaConsumer#subscribe,以匹配所有新创建的主题。

+0

请您详细说明这个问题:您遇到的问题是因为Kafka目前不支持使用手动分区分配(使用KafkaConsumer#assign)和组管理(使用KafkaConsumer#subscribe)。我仍然不理解为什么造成这个问题。 – StephCurry3093

+0

简而言之,你不应该使用KafkaConsumer.subscribe和KafkaConsumer.assign,所以坚持只使用其中的一个。 – amethystic

+0

我只使用一个。首先我使用assign,然后尝试切换到订阅,并且仍然收到相同的错误 – StephCurry3093

相关问题