我在我的框架中使用了卡夫卡生产者 - 消费者模型。在消费者端消费的记录稍后被索引到elasticsearch上。在这里,我有一个使用案例,如果ES停止运行,我将不得不暂停卡夫卡消费,直到ES启动。一旦启动,我需要恢复使用者并从我最后离开的位置消耗记录。 我不认为这可以通过@KafkaListener来实现。任何人都可以请给我一个这样的解决方案?我发现我需要为此编写自己的KafkaListenerContainer,但我无法正确实现它。任何帮助将非常感激。如何暂停卡夫卡消费者?
0
A
回答
0
有可能的sevaral解决方案,一种简单的方法是使用KafkaConsumer API。在KafkaConsumer实现中,跟踪将在下一次调用poll(...)时检索的主题位置。你的问题是在你从Kafka获得记录后,你可能无法将其插入到Elastic Search中。在这种情况下,你必须编写一个例程来重置消费者的位置,在你的情况下这将是consumer.seek(分区,consumer.position(分区)-1)。这将重置位置到更早的位置。在这一点上,一个好的方法是暂停分区(这将使服务器执行一些资源清理),然后轮询ES(通过您希望的任何机制)。一旦ES可用,请在消费者上致电恢复并继续进行通常的轮询插入周期。
EDITED讨论
之后创建的Spring bean与指定的生命周期方法。在bean的初始化方法中实例化你的KafkaConsumer(从任何来源检索消费者的配置)。从该方法开始一个线程与消费者交互并更新ES,其余设计如上所述。这是一个单一的模型。为了获得更高的吞吐量,考虑将从Kafka中检索到的数据保存在内存队列中的小内存中,并且调度程序线程将消息传递给共享线程以更新ES。
0
我建议宁可暂停使用者,为什么不能一次又一次地重试同一个消息,并且一旦消息成功使用后提交偏移量。
例如:
注解你的方法与@Retryable
和块用的try/catch你的方法,并在catch块抛出新的异常。
对于ListenerFactory配置中添加属性:
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);
相关问题
- 1. 暂停高级卡夫卡消费者
- 2. 卡夫卡gruop消费者
- 3. 卡夫卡消费者不消费
- 4. 如何从生产者消费卡夫卡的消费者?
- 5. 卡夫卡消费者/生产者API
- 6. 消费消费使用卡夫卡消费者 - Java
- 7. 卡夫卡10.2新消费者与旧消费者
- 8. 复位消费者在卡夫卡0.10
- 9. 无法创建卡夫卡消费者
- 10. Spark Streaming中的卡夫卡消费者
- 11. 卡夫卡消费者同步行为
- 12. 问题在消费者卡夫卡
- 13. 卡夫卡消费者不是从
- 14. 卡夫卡消费者行为
- 15. 关闭卡夫卡消费者
- 16. 卡夫卡消费者行为
- 17. 春季集成卡夫卡消费者
- 18. 卡夫卡消费者 - Java客户端
- 19. 卡夫卡消费者与JAVA
- 20. 卡夫卡0.90消费者坚持
- 21. 云中的卡夫卡消费者
- 22. 卡夫卡kafka.common.MessageSizeTooLargeException在消费者
- 23. 卡夫卡消费者滞后JMX
- 24. 卡夫卡:消费者崩溃
- 25. 卡夫卡消费者查询
- 26. 卡夫卡0.9新消费者api ---如何看消费者偏移
- 27. 卡夫卡消费者没有消费数据
- 28. 如何关闭卡夫卡消费者一旦消费完所有消息?
- 29. 卡夫卡消费10节
- 30. 卡夫卡消费群挂
我知道如何做到这一点的理论,原本以为的相当接近你的答案的东西,我的问题是与实现。你可以请同样的代码片段吗? –
如果您使用KafkaConsumer很好,实现很简单,所有列出的方法都可用,您可能无法使用注释执行此操作。在执行中您遇到问题的位置 – Ironluca
My Consumer是一个POJO侦听器,使用@KafkaListener进行了注释。我有一个ConsumerConfig类,用于此使用者的配置。我的听者的方法如下: @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID,ContainerFactory通过= “kafkaListenerContainerFactory”) 公共无效过程(ConsumerRecord , ?>记录){ logger.info( “记载:” +记录); } 我不知道从哪里获取'消费者'对象来申请寻找和其他方法。 –