我有一个消费者组中有两个消费者分配了相同的kafka主题分区。我希望从消费者B内部获得最后一次读取偏移量,消费者A.任何想法,如何实现这一点?如何获取消费者组的最后消费抵消?
1
A
回答
2
0
卡夫卡店偏移量由(消费者组ID,主题,分区),所以首先要注意的一点是,从卡夫卡点认为没有像“最后一次读取消费者A的抵消额”那样的东西。您可以通过Kafka使用者API获得的所有信息都适用于给定的(组,主题,分区)。消费者API中有两种方法可能有用。
commited():获取给定分区的最后提交的偏移量(无论提交是由此进程还是其他进程发生的)。
position():获取将被提取的下一条记录的偏移量(如果具有该偏移量的记录存在)。
如果这不是你所需要的,那么你将不得不自己实现一些东西。假设你已经知道如何获得最后抵消消费的读操作,那么消费者A应该是值存储在某个位置,提供给消费者B.这个位置可能是
- 卡夫卡本身。例如,消费者A可以将最近的读取偏移量发布到 像ConsumerA-p0这样的众所周知的主题,而消费者B可以订阅 这个主题。
- 动物园管理员。再次,在一个众所周知的道路上达成一致。
- 外部数据库。
- 更基本的选择,如果消费者都共享同一个操作系统:IPC,在文件系统中的文件,在内存与锁保护的变量等
+0
我同意你和理论上它看起来似乎合理,但我想知道是否有某些功能由kafka本身提供服务的目的 – swappy
+0
添加更多的细节给我的答案。我认为这些都是你有的选择。 –
相关问题
- 1. 使用kafka库查找消费者抵消滞后的代码?
- 2. 卡夫卡消费者抵消最大值?
- 3. 从kafka读取消费者抵消的工具0.9
- 4. RabbitMQ pika消费者取消后的异步消费者心跳问题
- 5. 生产者消费者请求取消
- 6. 消费者生产者多线程消费者不会消逝
- 7. 如何从Salesforce获取消费者密钥和消费者密钥
- 8. 如何在使用Semphores的生产者 - 消费者中消费?
- 9. 如何从生产者消费卡夫卡的消费者?
- 10. 卡夫卡消费者不消费
- 11. 消费消费使用卡夫卡消费者 - Java
- 12. RabbitMQ消费者
- 13. 消费者池
- 14. Kafka消费群集环境抵消
- 15. 生产者 - 消费;消费者如何停止?
- 16. 如何查找从ActiveMQ获取消息的消费者的IP?
- 17. 消息消费者实现
- 18. 了解消费者组ID
- 19. 在RabbitMQ配置消费者取消
- 20. 如何使用消费者
- 21. 消费者过滤的生产者 - 消费者阻塞队列
- 22. 生产者/消费者线程中的油门消费者
- 23. Java生产者 - 消费者:生产者不“通知()”消费者
- 24. 卡夫卡工作/抵消协调与消费者
- 25. 消费者没有从卡夫卡消费者提交的消息10消费者
- 26. 生产消费者
- 27. 竞争消费者
- 28. Mochiweb + AMQP消费者
- 29. SAML消费者URL
- 30. ActiveMQ消费者OutOfMemoryException
你想有一个以上的消费者在消费从单个分区平行吗? –
@LucianoAfranllie类似的,但我可以让消费者平行读,但我希望只跟踪其他消费者在卡夫卡的最后读取偏移量。 – swappy