2016-08-24 105 views
10

当我注意到一些奇怪的行为时,我正在对一个旧主题进行一些测试。读卡夫卡的日志,我注意到这个“删除过期8补偿”消息:Apache Kafka消费者组的偏移量如何到期?

[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator) 
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator) 
Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log) 
Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log) 
Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log) 
Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log) 
Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log) 
Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex) 
Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log) 
Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log) 
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex) 
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex) 
[GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator) 
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator) 
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator) 
[Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager) 

其实,我有2个问题:

1)如何做到这一点抵消到期适用于一个消费群体?

2)过期的偏移量可以解释这种行为,当消费者在auto.offset.reset = latest时不会轮询任何内容,但是当它有auto.offset.reset =最早时它从最后一次提交的偏移量开始轮询?

回答

19

卡夫卡默认在可配置的时间段后删除提交的偏移量。参见参数offsets.retention.minutes。也就是说,如果一个消费者组在这段时间内不活跃(即,没有提交任何偏移量),偏移将被删除。

如果启动消费,将出现以下情况:

  1. 看的(有效)提交的被抵消(用于消费群)
    1. 如果有效偏移发现,从那里
    2. 恢复
    3. 如果没有有效的是发现偏移,复位按auto.offset.reset参数
0123偏移

因此,如果您的偏移被删除并且auto.offset.reset = latest,那么您的使用者将不会轮询任何内容,直到将新数据添加到该主题。如果auto.offset.reset = earliest它应该消耗整个话题。

看到这个JIRA了这个https://issues.apache.org/jira/browse/KAFKA-3806

+0

谢谢讨论。因此,即使对于活跃的消费者,如果在此保留时间内没有任何新的偏移量提交,偏移量也会被删除? – Enzo

+0

只要该组处于活动状态,偏移就不会被删除。 –

+1

这不一定是正确的。如果你设置enable.auto.commit = false并且没有新的数据(没有提交) - 提交将会过期。 – b2zw2a

相关问题