2017-07-08 94 views

回答

0

当您收到一条消息时,它应该包括来自它的位置的主题,分区和偏移量(除了密钥和值)。

example here

consumer.OnMessage += (_, msg) 
    => Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} " + 
     $"Offset: {msg.Offset} {msg.Value}"); 

当它到达每个主题分区

consumer.OnPartitionEOF += (_, end) 
    => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}" + 
      $" , next message will be at offset {end.Offset}"); 
1

除了前面的回答的最后,你还可以得到一个事件,你可以使用

List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions) 

它将返回针对给定主题/分区从librdkafka轮询的最后一个偏移量

您也有类似的Committed方法,最新的承诺,从消费者


偏移还可以查询最新的已知偏移

WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout) 

它会发送到卡夫卡集​​群的请求。该通话被阻止,请设置适当的超时时间。目前,您无法一次在多个分区上发送请求。 你可以用它要么得到最后一个已知的偏移,无论是计算滞后

还有

WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition) 

将查询内部状态librdkafka,并可能返回INVALID_OFFSET(-1001)。您可以使用它来检测由于处理数据而导致的一些滞后。 (此方法的位置和结果之间的差异)

0

而不是检索来自消费者偏移信息(我不想第一个消费信息),我是能够生产这样的阅读主题偏移(highlow):

var partitionOffset = _producer.QueryWatermarkOffsets(new TopicPartition("myTopic", myPartition), TimeSpan.FromSeconds(10)); 
相关问题