2017-09-27 314 views
0

我在我的框架中使用了卡夫卡生产者 - 消费者模型。在消费者端消费的记录稍后被索引到elasticsearch上。在这里,我有一个使用案例,如果ES停止运行,我将不得不暂停卡夫卡消费,直到ES启动。一旦启动,我需要恢复使用者并从我最后离开的位置消耗记录。 我不认为这可以通过@KafkaListener来实现。任何人都可以请给我一个这样的解决方案?我发现我需要为此编写自己的KafkaListenerContainer,但我无法正确实现它。任何帮助将非常感激。如何暂停卡夫卡消费者?

回答

0

有可能的sevaral解决方案,一种简单的方法是使用KafkaConsumer API。在KafkaConsumer实现中,跟踪将在下一次调用poll(...)时检索的主题位置。你的问题是在你从Kafka获得记录后,你可能无法将其插入到Elastic Search中。在这种情况下,你必须编写一个例程来重置消费者的位置,在你的情况下这将是consumer.seek(分区,consumer.position(分区)-1)。这将重置位置到更早的位置。在这一点上,一个好的方法是暂停分区(这将使服务器执行一些资源清理),然后轮询ES(通过您希望的任何机制)。一旦ES可用,请在消费者上致电恢复并继续进行通常的轮询插入周期。

EDITED讨论

之后创建的Spring bean与指定的生命周期方法。在bean的初始化方法中实例化你的KafkaConsumer(从任何来源检索消费者的配置)。从该方法开始一个线程与消费者交互并更新ES,其余设计如上所述。这是一个单一的模型。为了获得更高的吞吐量,考虑将从Kafka中检索到的数据保存在内存队列中的小内存中,并且调度程序线程将消息传递给共享线程以更新ES。

+0

我知道如何做到这一点的理论,原本以为的相当接近你的答案的东西,我的问题是与实现。你可以请同样的代码片段吗? –

+0

如果您使用KafkaConsumer很好,实现很简单,所有列出的方法都可用,您可能无法使用注释执行此操作。在执行中您遇到问题的位置 – Ironluca

+0

My Consumer是一个POJO侦听器,使用@KafkaListener进行了注释。我有一个ConsumerConfig类,用于此使用者的配置。我的听者的方法如下: @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID,ContainerFactory通过= “kafkaListenerContainerFactory”) 公共无效过程(ConsumerRecord 记录){ logger.info( “记载:” +记录); } 我不知道从哪里获取'消费者'对象来申请寻找和其他方法。 –

0

我建议宁可暂停使用者,为什么不能一次又一次地重试同一个消息,并且一旦消息成功使用后提交偏移量。

例如:

注解你的方法与@Retryable

和块用的try/catch你的方法,并在catch块抛出新的异常。

对于ListenerFactory配置中添加属性:

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); 
factory.getContainerProperties().setAckOnError(false);