0

我想了解一些基本的Kafka概念,以便我可以正确监视基于KafkaStreams的应用程序的进度。Kafka重置工具消费者偏移不重置为零

专门用于调试目的,我需要能够让我的应用程序重新使用整个主题。为此我使用了reset tool

在执行脚本查看一些输入主题的卡夫卡管理器后,我看到Consumer Offset已经减少,而Lag已经增加(这是有道理的)。虽然Consumer Offset不会为零。我试图解释这一点,但我还没有找到关于卡夫卡经理中的Consumer OffsetLogsize是指什么的具体解释。

为了使它适合我所看到的,我假设Logsize是自开始以来放置到主题中的消息总量,但不一定是当前在主题中的消息量。有些人可能因年龄超过保留期而被抛弃。我对吗?

如果不是,那么在运行某些输入主题的重置工具后,我观察到Consumer Offset等于Logsize(而不是零)并且Lag为零?

+0

你确定主题中的第一个偏移量为0? – Natalia

回答

2

我对yahoo-kafka-manager不熟悉,但是,您也可以使用bin/kafka-consumer-groups.sh(Kafka自带的工具)。有LOG-END-OFFSET意味着你所描述的。从命名的角度来看,如果Logsize与“日志结束偏移量”相同或者分区中最高和最低偏移量之间的差异,我不清楚。

在执行脚本查看Kafka管理器中的一些输入主题后,我发现Consumer Offset已经减少并且延迟增加。

这很有道理 - 因为“滞后”是“日志结束偏移量”和“承诺偏移量”的差异,所以在重置应用程序之后,滞后量应该增加。 但是,我不确定为什么承诺的消费者群体抵消不是零(你可以非常使用 bin/kafka-consumer-group.sh - 也许 yahoo-kafka-manager报告不同的东西)。

更新:但是,该工具不会将偏移量设置为零,而是设置为“开始日志”。 (该文档是不正确的。)

另外请注意,您重置您的应用程序,然后重新启动后auto.offset.reset策略可能在打勾([承诺]偏移 可能是无效的,如果日志截断了)。这能解释你观察到的行为吗?

本博客文章也可能有助于了解进一步的细节:https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

+0

通过将数据写入一些测试主题我验证了我对关于Logsize的含义的假设是正确的。但我认为'auto.offeset.reset'不需要我看到什么。我认为重置工具本身会选取最新的可用偏移量,如果零不可用,则不会将偏移量重置为零。我通过将该值设置为最新和最早并且没有任何变化进行检查。如果重置工具将偏移量设置为零,那么'auto.offset.reset'在每种情况下都会有不同的表现。重置工具是否可以这样更新? – LetsPlayYahtzee

+1

嗯......不确定。顺便说一句:文档不完全正确。输入主题的工具“seeksToBeginning”(参见https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/StreamsResetter.java#L223)所以'auto.offset。当您重置后启动Streams应用程序时,不应触发“重置”(有什么意义 - 文档不正确:( - 我将打开一个PR来修复下一版本的文档) –

相关问题