我使用卡夫卡消费者从几个主题阅读,我需要那些之一,具有更高的优先级。处理需要很多时间,并且总是有很多消息处于(低优先级)主题中,但我需要尽快处理来自其他消息的消息。卡夫卡消费者 - 主题(S)具有较高优先级
这类似的问题,因为Does Kafka support priority for topic or message?但这一个是使用旧的API。
在新的API(0.10.1.1)的方法有
KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)
但它不是我清楚,如何有效地检测出有高优先级的主题新的消息,并有必要暂停消费来自其他主题。
任何想法/例子吗?
您可以检查是否为您监视的分区endOffsets比那些分区最后提交的偏移较大。这是如何工作的将是特定实现,但会让你知道在你投票之前是否有更多的消息需要消费 – dawsaw