我正在使用Confluent kafka C#客户端。如何获得这个主题中消耗的最新偏移量?如何获得Confluent kafka C#库中Kafka主题的最新偏移量?
0
A
回答
0
当您收到一条消息时,它应该包括来自它的位置的主题,分区和偏移量(除了密钥和值)。
consumer.OnMessage += (_, msg)
=> Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} " +
$"Offset: {msg.Offset} {msg.Value}");
当它到达每个主题分区
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}" +
$" , next message will be at offset {end.Offset}");
1
除了前面的回答的最后,你还可以得到一个事件,你可以使用
List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions)
它将返回针对给定主题/分区从librdkafka轮询的最后一个偏移量
您也有类似的Committed
方法,最新的承诺,从消费者
偏移还可以查询最新的已知偏移
WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
它会发送到卡夫卡集群的请求。该通话被阻止,请设置适当的超时时间。目前,您无法一次在多个分区上发送请求。 你可以用它要么得到最后一个已知的偏移,无论是计算滞后
还有
WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
将查询内部状态librdkafka,并可能返回INVALID_OFFSET(-1001)。您可以使用它来检测由于处理数据而导致的一些滞后。 (此方法的位置和结果之间的差异)
0
而不是检索来自消费者偏移信息(我不想第一个消费信息),我是能够生产这样的阅读主题偏移(high
和low
):
var partitionOffset = _producer.QueryWatermarkOffsets(new TopicPartition("myTopic", myPartition), TimeSpan.FromSeconds(10));
相关问题
- 1. 如何获得kafka的最新偏移
- 2. Kafka Spark-Streaming偏移问题
- 3. confluent platform vs apache kafka
- 4. Flink(Kafka源)如何管理偏移量?
- 5. Confluent Kafka:如何在confluent-kafka-python客户端中指定序列化和分区?
- 6. 在Kafka中读取偏移量
- 7. Kafka喜欢Kinesis Stream上的偏移量?
- 8. confluent-kafka python avro消息
- 9. Kafka EARLIEST和Kafka LATEST偏移重置的效率如何?
- 10. Kafka的Spark流:如何从Kafka consumer DStream获取主题名称?
- 11. 如何获得Kafka 0.10.x中没有外壳的电流偏移量?
- 12. 如何访问和提交kafka的__consumer_offsets主题上的偏移量?
- 13. Spark Streaming Kafka初始偏移量
- 14. kafka和Spark:通过API获取主题的第一个偏移量
- 15. Can Kafka 10 kafka-consumer-groups.sh描述给定组的单个主题的偏移量吗?
- 16. 如何获得apache kafka中的所有主题?
- 17. 主题级别的kafka偏移和日志保留配置
- 18. kafka spout无法连接到kafka主题
- 19. Kafka Streams - 获得KTable和KStream相同主题的最佳方式?
- 20. 如何使用kafka连接将kafka主题下载到oracle中
- 21. Spring Kafka倾听所有主题并调整分区偏移
- 22. 如何删除apache中的主题kafka
- 23. kafka:从第二个主题读取偏移recvd从第一个
- 24. Apache-Kafka-Connect,Confluent-HDFS-Connector,未知魔术字节,Kafka-To-Hdfs
- 25. 如何重置,更新或清除每个消费群组在Kafka中的主题的偏移量?
- 26. 从C++代码中删除kafka主题
- 27. Apache Kafka消费者组的偏移量如何到期?
- 28. 如何获取Golang Kafka中的分区的消费者组偏移量10
- 29. 在Kafka中解释复制偏移量检查点和恢复点偏移量
- 30. Apache Kafka:如何接收来自Kafka的最新消息?