我想为我们的某些人实现数据重放,为此,我需要使用Kafka保留策略(因为我使用的是join,并且我需要窗口时间准确)。 P.S.我使用的卡夫卡版本0.10.1.1Kafka保留策略不能按预期工作
我送我的数据转化为这样的话题:
kafkaProducer.send(
new ProducerRecord<>(kafkaTopic, 0, (long) r.get("date_time") ,r.get(keyFieldName).toString(), r)
);
我创造我的题目是这样的:
卡夫卡的话题 - 创建 - -zookeeper localhost:2181 - 复制因子1 --partition 1 - topic myTopic
kafka-topics --zookeeper localhost --alter --topic myTopic --config retention.ms = 172800000 kafka-topics --zookeeper localhost --alter --topic myTopic --config segment.ms = 172800000
因此,通过上述设置,我应该将我的主题的保留时间设置为48小时。
我扩展TimestampExtractor
为了记录每条消息的实际时间。
public class ConsumerRecordOrWallclockTimestampExtractor implements TimestampExtractor {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerRecordOrWallclockTimestampExtractor.class);
@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord) {
LOG.info("TIMESTAMP : " + consumerRecord.timestamp() + " - Human readable : " + new Date(consumerRecord.timestamp()));
return consumerRecord.timestamp() >= 0.1 ? consumerRecord.timestamp() : System.currentTimeMillis();
}
}
对于测试,我发送了4条消息到我的主题,我得到这4条日志消息。
2017年2月28日10时23分39秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP: 1488295086292人类readble -Tue 2月28日10时18分06秒美国东部时间2017年
2017年2月28日10:24 :01 INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP:14832.72亿人readble -Sun 1月1 07:00:00 EST 2017年
2017年2月28日10时26分十一秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP: 1485820800000人类readble -Mon 1月30日19:00:00东部时间2017年
2017-02-28 10点27分22秒INFO ConsumerRecordOrWallclockTimestampExtractor:21 - TIMESTAMP:1488295604411人类readble -Tue 2月28日10点26分44秒美国东部时间2017年
因此,基于我希望看到我的两个中传递消息得到清除/删除5分钟后(第2次和第3次,因为它们是1月1日和1月30日)。但我试图消耗我的话题一个小时,每次我消费我的话题我收到了所有4条消息。
卡夫卡的Avro控制台消费者--zookeeper本地主机:2181 --from-开始--topic myTopic
我卡夫卡的配置是这样的:
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
上午我做错了什么或者我错过了什么?
如果是这样怎么解释当我用1970年的时间戳(非常旧的消息)发送两条消息后5分钟都被删除? – Am1rr3zA