2016-07-07 42 views
1

假设我有一个服务,它通过kafka-rest-proxy使用消息,并始终在同一个使用者组上。我们还要说,它正在消耗一个有一个分区的主题。当服务启动时,它会在kafka-rest-proxy中创建一个新的使用者,并使用生成的使用者url直到服务关闭。当服务恢复时,它将在kafka-rest-proxy中创建一个新的消费者,并使用新的url(和新消费者)进行消费。卡夫卡休息代理消费者创建

我的问题

  1. 因为卡夫卡只能有每个分区最多一个消费者。当消费者重新启动时,在kafka和kafka-rest-proxy中会发生什么?即新的消费者是在卡夫卡休息代理中创建的,但旧的消费者没有机会被销毁。所以现在'n'消费者在'kafka-rest-proxy'中重新启动我的服务后有'n'个消费者,但其中只有一个正在被消费。我甚至能够消费我的新消费者的消息,因为消费者多于分区?

  2. 让我们更复杂一些,并说我的服务在同一个用户组和5个分区中有5个实例。 'n'重新启动服务的所有5个实例之后,我甚至会保证在不确保现有消费者适当销毁的情况下消费所有消息。即,在消费者创造期间,当消费者出门时,卡夫卡和卡夫卡休息代理人做了什么?

  3. 什么被认为是卡夫卡休息代理的最佳实践,以确保陈旧的消费者总是清理?你建议坚持消费者的网址?我应该强制重新启动一个kafka-rest-proxy,以确保在启动我的服务之前销毁现有的消费者?

*编辑* 我相信我的问题的一部分,此配置回答,但不是全部。

consumer.instance.timeout.ms - 消费者实例自动销毁之前的空闲时间量。 类型:int 默认:300000 重要性:低

回答

2
  1. 如果无法正常关闭消费者,将活着的最后一个请求后的一段是向它提出的。该代理将垃圾收集陈旧的消费者正是这种情况 - 如果它不是干净关闭,消费者将无限期地坚持一些分区。通过自动垃圾收集消费者,您不需要一些单独的持久存储来跟踪您的消费者实例。正如您发现的,您可以通过config consumer.instance.timeout.ms来控制此超时。

  2. 由于实例将被垃圾收集,因此您将保证最终会使用所有消息。但在超时期间,有些分区仍然可能会分配给旧的使用者组,您不会在这些分区上取得任何进展。

  3. 理想情况下,您的应用程序不清洁关机很罕见,所以最佳做法是在应用程序关闭时清理消费者。即使在特殊情况下,您也可以使用区块的try/catch/finally销毁用户。如果一个人活着,它最终会恢复。除此之外,如果您的应用程序可以容忍这一点,请考虑调整consumer.instance.timeout.ms设置为较低。它只需要大于使用消费者的呼叫之间的最长时间(并且您应该记住可能的错误情况,例如,如果处理消息需要与另一个系统进行交互,并且该系统可能变得很慢/无法访问,那么您应该考虑当设置这个配置)。

你可以坚持的URL,但即使是在输球的消费者,因为你不能创造原子消费者和它的URL保存到其他一些持久性存储的轨迹一定的风险。此外,由于完全不受控制的故障,如果您没有清理机会的情况不应该是常见情况,那么通常不会让您受益。如果您需要从该故障中快速恢复,则无论如何,您的应用程序的消费者实例超时可能会显着减少。

Re:强制重新启动代理,这将是相当不常见的,因为REST代理通常是共享服务,并且这样做会影响所有其他正在使用它的应用程序。