2017-07-17 195 views
0

我使用卡夫卡消费者从几个主题阅读,我需要那些之一,具有更高的优先级。处理需要很多时间,并且总是有很多消息处于(低优先级)主题中,但我需要尽快处理来自其他消息的消息。卡夫卡消费者 - 主题(S)具有较高优先级

这类似的问题,因为Does Kafka support priority for topic or message?但这一个是使用旧的API。

在新的API(0.10.1.1)的方法有

KafkaConsumer::pause(Collection) 
KafkaConsumer::resume(Collection) 

但它不是我清楚,如何有效地检测出有高优先级的主题新的消息,并有必要暂停消费来自其他主题。

任何想法/例子吗?

+1

您可以检查是否为您监视的分区endOffsets比那些分区最后提交的偏移较大。这是如何工作的将是特定实现,但会让你知道在你投票之前是否有更多的消息需要消费 – dawsaw

回答

1

我终于解决了,因为dawsaw建议 - 在处理循环中,我存储所有主题/分区,我从读:

  • beginningOffsets
  • endOffsets
  • 承诺 - 我不能使用位置,因为我订阅主题,而不是分区。

每当(endOffset - commited) > 0任何优先议题,我叫consumer.pause()非优先主题和(endOffset - commited) == 0所有优先议题后再次恢复的。

+0

你能分享你的策略来解决这个问题吗?假设我们有(总共10 Gbs)低优先级消息和一些高优先级消息。我们有多个消费者和多个生产者。即使我们暂停了消费者,我们也需要暂停所有其他主题的制作者,以便让您的想法实现。对?您有没有这方面的经验,因为在100个服务和10个主题生态系统中,这似乎几乎不可能? - 是的,我已阅读你有关此事的其他问题。谢谢 – JSBach

+0

没有 - 没有必要暂停任何制作人 - 这个想法是,你有单个消费者订阅了几个主题(其中一些主题是高优先级和其他普通优先级)。在轮询新消息之前,您需要检查优先主题的滞后时间。如果任何这些滞后不为零,这意味着,您需要暂停订阅正常优先级的主题,而不是“消耗”消费者的时间。处理来自高优先级主题的所有消息后,可以再次恢复正常优先级的消息。 – miran

+0

谢谢。我不能完全违抗。但是它对于大型系统来说味道不好。一旦大坝门打开了大量的数据,我将不得不时刻检查,如果我正在浪费这个低优先级队列的资源。我为什么要?对。无论如何。再次感谢 – JSBach

0

我猜你可能在你的位置(),并提交()的搭配方法。 位置()方法的偏移的下一个记录,将被获取并承诺()方法获取最后提交的对于给定的分割区的偏移(如文档中所述)的。 在轮询较低优先级之前,您可以检查较高优先级的位置()和提交()。如果position()高于committed(),则可以在较高优先级()上暂停()较低优先级和poll(),然后恢复较低优先级。