我已经实现了一个循环分区如下消息数:卡夫卡 - 检查每一个分区
public class KafkaRoundRobinPartitioner implements Partitioner {
private static final Logger log = Logger.getLogger(KafkaRoundRobinPartitioner.class);
final AtomicInteger counter = new AtomicInteger(0);
public KafkaRoundRobinPartitioner() {}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionsCount = partitions.size();
int partitionId = counter.incrementAndGet() % partitionsCount;
if (counter.get() > 65536) {
counter.set(partitionId);
}
return partitionId;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
现在我想测试每个分区有相同数量的消息。例如,如果我有一个包含32个分区的主题,并且发送了32条消息给这个主题,我预计每个分区都只有1条消息。
我想要做的东西像下面这样:
KafkaPartitions allPartitions = new KafkaTopic("topic_name");
for (KafkaPartition partition : allPartitions) {
int msgCount = partition.getMessagesCount();
// do asserts
}
据我知道卡夫卡的Java API并没有为我们提供了这样的功能,但我可能已经失去了文件的东西。
有什么办法可以优雅地实现它吗?
更新 我发现只是一个基本的解决方案。由于我使用MULTY消费模式,我可以做的每个消费者如下:
consumer.assignment().size();
之后,我可以做:
consumer.poll(100);
并检查每一个消费者都有一个消息。在这种情况下,我不应该面对一个消费者从其分区中获得另一个消息的情况,因为由于我拥有相同数量的消费者和分区,所以Kafka应该以循环方式在消费者之间分配分区。