2016-02-11 133 views
1

我正在尝试使用Java API监控给定组的消费者偏移量。我创建了一个不订阅任何主题的额外消费者,但只需拨打consumer.committed(topic)即可获取抵消信息。这种作品,但:卡夫卡0.9新消费者api ---如何看消费者偏移

为了测试我只使用一个真正的消费者(即订阅该主题的消费者)。当我使用close()将其关闭并稍后重新启动时,尽管我使用poll(1000),但在订阅和第一次使用邮件之间需要27秒。

我猜这与重新平衡有可能被非订阅消费者混淆。这可能吗?有没有更好的方法来监控Java API的偏移量(我知道命令行工具,但需要使用API​​)。

回答

1

有不同的方法来检查从主题偏移,取决于目的你想要的它,除了的“承诺”,你如上所述,这里有两个更多的选择:如果你想

1)要知道偏移ID从消费者开始来从经纪人下一次线程(S)启动(S)的数据,则必须使用“位置”作为

long offsetPosition; 
TopicPartition tPartition = new TopicPartition(topic,partitionToReview); 
    offsetPosition = kafkaConsumer.position(tPartition); 
    System.out.println("offset of the next record to fetch is : " + position); 

2)调用“偏移()”方法来自ConsumerRecord对象,在执行了来自kafkaConsumer的民意调查之后

Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator(); 
while(it.hasNext()){ 
ConsumerRecord<byte[],byte[]> record = it.next(); 
System.out.println("offset : " + record.offset()); 
} 
+0

这两种方法仅适用于订阅的消费者。重点是我有一个消费者只是监控。它不会参与消费,所以它不能使用这些方法。 – Harald

1

发现它:监控消费者增加了困惑,但不是罪魁祸首。最后很容易理解,但有点意外(至少对我来说):

session.timeout.ms的默认值是30秒。当消费者消失时,需要长达30秒的时间才会被宣布死亡,并且工作会重新平衡。为了测试,我停止了我的单一消费者,等待了三秒钟,然后重新启动了一个新消费者。然后在它开始前27秒,填满30秒超时。

我原本以为一个单独的消费者启动不会等待超时过期,而是开始“重新平衡”,即立即抓住工作。看起来,即使只有一个消费者,工作重新平衡之前,超时也必须到期。

为了让测试更快地完成,我将配置更改为对代理使用较低的session.timeout.ms,对于代理使用较低的session.timeout.ms以及group.min.session.timeout.ms

总结:使用未订阅任何主题的消费者来监控偏移量工作得很好,似乎不会干扰再平衡过程。