在0.11或更高版本中,您可以运行bin/kafka-delete-records.sh命令来标记要删除的消息。
https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh
例如发布100个消息
seq 100 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
然后删除那些100个的消息90与新kafka-delete-records.sh 命令行工具
./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json
其中offsetfile.json包含
{"partitions": [{"topic": “mytest", "partition": 0, "offset": 90}], "version":1 }
然后消耗从头开始的消息以验证消息中的90个确实标记为已删除。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning
91
92
93
94
95
96
97
98
99
100
您首先提到的保留时间技巧要好得多。第二种方式会导致复制主题出现问题,并导致主题的元数据与实际情况不一致。请注意,偏移号码不会回到零。 – dawsaw