2017-02-28 41 views
1

我想为我们的某些人实现数据重放,为此,我需要使用Kafka保留策略(因为我使用的是join,并且我需要窗口时间准确)。 P.S.我使用的卡夫卡版本0.10.1.1Kafka保留策略不能按预期工作

我送我的数据转化为这样的话题:

kafkaProducer.send(
        new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r) 
      ); 

我创造我的题目是这样的:

卡夫卡的话题 - 创建 - -zookeeper localhost:2181 - 复制因子1 --partition 1 - topic myTopic
kafka-topics --zookeeper localhost --alter --topic myTopic --config retention.ms = 172800000 kafka-topics --zookeeper localhost --alter --topic myTopic --config segment.ms = 172800000

因此,通过上述设置,我应该将我的主题的保留时间设置为48小时。

我扩展TimestampExtractor为了记录每条消息的实际时间。

public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor { 
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class); 
    @Override 
    public long extract(ConsumerRecord<Object, Object> consumerRecord) { 
     LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp())); 
     return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis(); 
    } 
} 

对于测试,我发送了4条消息到我的主题,我得到这4条日志消息。

2017年2月28日10时23分39秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP: 1488295086292人类readble -Tue 2月28日10时18分06秒美国东部时间2017年
2017年2月28日10:24 :01 INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP:14832.72亿人readble -Sun 1月1 07:00:00 EST 2017年
2017年2月28日10时26分十一秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP: 1485820800000人类readble -Mon 1月30日19:00:00东部时间2017年
2017-02-28 10点27分22秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP:1488295604411人类readble -Tue 2月28日10点26分44秒美国东部时间2017年

因此,基于​​我希望看到我的两个中传递消息得到清除/删除5分钟后(第2次和第3次,因为它们是1月1日和1月30日)。但我试图消耗我的话题一个小时,每次我消费我的话题我收到了所有4条消息。

卡夫卡的Avro控制台消费者--zookeeper本地主机:2181 --from-开始--topic myTopic

我卡夫卡的配置是这样的:

############################# Log Retention Policy ############################# 

# The following configurations control the disposal of log segments. The policy can 
# be set to delete segments after a period of time, or after a given size has accumulated. 
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 
# from the end of the log. 

# The minimum age of a log file to be eligible for deletion 
log.retention.hours=168 

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining 
# segments don't drop below log.retention.bytes. 
#log.retention.bytes=1073741824 

# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=1073741824 

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 
log.retention.check.interval.ms=300000 

上午我做错了什么或者我错过了什么?

回答

5

卡夫卡通过删除日志段来实现其保留策略。 Kafka永远不会删除活动段,这是它将添加发送到分区的新消息的段。卡夫卡只删除旧的细分市场。卡夫卡辊主动段成旧段当一个新的消息被发送到的分区,无论是

  • 与新的消息中的活性段的大小将超过log.segment.bytes,或
  • 第一时间戳在活动时间段消息比log.roll.ms以上(默认为7天)

因此,在你的榜样,你必须周二2月28日10时18分06秒美国东部时间2017年后要等7天,发送一个新的消息,那么所有4个初始消息将被删除。

+0

如果是这样怎么解释当我用1970年的时间戳(非常旧的消息)发送两条消息后5分钟都被删除? – Am1rr3zA