2017-04-17 194 views
2
#!/bin/zsh 
zk_servers=('10.138.0.8' '10.138.0.9' '10.138.0.16') 
kafka_servers=('10.138.0.13:9092' '10.138.0.14:9092') 
topics=('t1' 't2' 't1_failed' 't2_failed') 


NORMAL=$(tput sgr0) 
GREEN=$(tput setaf 2; tput bold) 
YELLOW=$(tput setaf 3) 
RED=$(tput setaf 1) 

function red() { 
    echo -e "$RED$*$NORMAL" 
} 

function green() { 
    echo -e "$GREEN$*$NORMAL" 
} 

function yellow() { 
    echo -e "$YELLOW$*$NORMAL" 
} 



for topic in $topics; do 
    yellow "Cleaning up messages in topic @ " $topic 
    yellow "==============================================================" 
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --config retention.ms=100 
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic 
done 

red "Waiting 120 seconds for messages to expire" 
sleep 120 

for topic in $topics; do 
    green "Restoring config of topic @ " $topic                 
    green "==============================================================" 
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --delete-config retention.ms     
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic 
    $KAFKA/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $kafka_servers --topic $topic 
done 

当我运行此脚本时 - 我可以看到config.retention.ms已更改为100ms,但延迟120秒后 - 我仍然在所有kafka主题中看到相同的消息。卡夫卡的retention.ms没有被卡夫卡0.10.2强制执行?

那么如何清除邮件?

感谢, 梅德

回答

3

你必须等待log.retention.check.interval.ms默认为5分钟。

5

它比接受的答案还多一点。卡夫卡将消息存储在文件系统上的日志文件中。这些文件具有翻转(由时间或大小配置)。一旦文件不再是当前文件,Kafka将不再追加到该文件。

现在的乐趣部分:卡夫卡不会过期的个人消息。一旦该文件中消息的最高时间戳比retention.ms旧,它将(对于非紧凑主题)删除整个日志文件。保留时间告诉你至少至少可用的消息,但它可能可用的时间更长(取决于翻转配置和消息量)。

在较早的Kafka版本中,这不是基于消息时间戳,而是基于对日志文件的写入访问。感谢@dawsaw指出这一点。

+0

此处描述的行为适用于0.10.2之前的版本。上次修改时间不再使用。而是使用时间索引。 – dawsaw

+0

你碰巧有链接或KIP吗?目前的文档仍然说“数据一次被删除一个日志段,日志管理器允许可插拔删除策略选择哪些文件符合删除条件,但是当前策略会删除任何修改时间超过N天前的日志,尽管保留最后N GB的政策也可能有用。“ (https://kafka.apache.org/documentation/#impl_deletes) – ftr

+0

这是一个0.10.1的显着变化https://kafka.apache.org/documentation/#upgrade_10_1_breaking – dawsaw