2016-06-14 135 views
0

如果消费者使用auto.offset.reset=latest,那么在发布者和N使用者使用auto.offset.reset=latest之后,他们会错过在订阅该消息之前发布到某个主题的所有消息......消费者使用auto.offset.reset=latest直到所有的用户开始使用消息,然后开始出版使卡夫卡消费者在订阅之前消费现有消息

  1. 制作发行商等待:不重播的话题存在认缴之前的消息...

    所以我需要两种。不知道如何做到这一点,而不是利用Zookeeper。卡夫卡是否提供了这样做的手段?

  2. 另一种方法是有auto.offset.reset=latest消费者,使他们明确地消耗掉所有现有的消息之前的情况下,他们即将订阅与现有消息的主题...

,这是什么情况下,最好的做法是什么?

我想消费者必须检查现有消息的主题,如果有消息,则消费它们,然后启动消费。这听起来像是对我来说最好的办法......

+0

使用'auto.offset.reset =最早'有没有什么坏处? – avr

+0

如果你使用最早的,你需要记忆或坚持最后的偏移量,以便知道上次停止的位置,对吗?我只是意识到,这两个选项都是错误的,唯一的方法是获得最后的偏移量... – lisak

+0

是的,你是对的。你只需要每次跟踪偏移量。你介意告诉你的用例,比如你在哪里使用Kafka,以及你正在使用哪种处理引擎,如火花或风暴?无论是批处理还是流媒体? – avr

回答

-1

如果一个高层次的消费得到启动,它执行以下操作:

  1. 外观为它的消费群

    一个坚定的偏移。如果发现有效偏移量,则从那里恢复

    b。如果没有有效的偏移发现,根据auto.offset.reset

因此,auto.offset.reset设定偏移才会触发,如果没有有效的抵消承诺。此行为是有意和必要的,以便在出现故障时提供至少一次处理保证。

因此,你想读从一开始的话题,你可以使用一个新的消费group.id并设置auto.offset.reset = earliest,或者你明确修改使用seekToBeginning()启动时的偏移您启动poll()循环之前。

0

我们使用Eureka提供的服务发现功能(任何其他服务发现应用可以完成这项工作)+混叠来做选项(1)。基本上,发布者不会注册自己(并且开始处理请求或发布通知),直到至少有一个订阅者可用。