0

我已经实现了一个循环分区如下消息数:卡夫卡 - 检查每一个分区

public class KafkaRoundRobinPartitioner implements Partitioner { 

    private static final Logger log = Logger.getLogger(KafkaRoundRobinPartitioner.class); 

    final AtomicInteger counter = new AtomicInteger(0); 

    public KafkaRoundRobinPartitioner() {} 

    @Override 
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 
     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 
     int partitionsCount = partitions.size(); 

     int partitionId = counter.incrementAndGet() % partitionsCount; 
     if (counter.get() > 65536) { 
      counter.set(partitionId); 
     } 
     return partitionId; 
    } 

    @Override 
    public void close() { 
    } 

    @Override 
    public void configure(Map<String, ?> map) { 
    } 
} 

现在我想测试每个分区有相同数量的消息。例如,如果我有一个包含32个分区的主题,并且发送了32条消息给这个主题,我预计每个分区都只有1条消息。

我想要做的东西像下面这样:

KafkaPartitions allPartitions = new KafkaTopic("topic_name"); 
for (KafkaPartition partition : allPartitions) { 
    int msgCount = partition.getMessagesCount(); 
    // do asserts 
} 

据我知道卡夫卡的Java API并没有为我们提供了这样的功能,但我可能已经失去了文件的东西。

有什么办法可以优雅地实现它吗?

更新 我发现只是一个基本的解决方案。由于我使用MULTY消费模式,我可以做的每个消费者如下:

consumer.assignment().size(); 

之后,我可以做:

consumer.poll(100); 

并检查每一个消费者都有一个消息。在这种情况下,我不应该面对一个消费者从其分区中获得另一个消息的情况,因为由于我拥有相同数量的消费者和分区,所以Kafka应该以循环方式在消费者之间分配分区。

回答

0

最后,我写了如下内容。

我KafkaConsumer的劳动者有下列代码:

public void run() { 
    while (keepProcessing) { 
     try { 
      ConsumerRecords<byte[], byte[]> records = consumer.poll(100); 
      for (ConsumerRecord<byte[], byte[]> record : records) { 
       // do processing 
       consumer.commitSync(); 
      } 
     } catch (Exception e) { 
      logger.error("Couldn't process message", e); 
     } 
    } 
} 

而且在我的测试中,我决定检查每个消费者也只有一个承诺,这意味着信息分布在一个循环的方式。测试代码:

public class KafkaIntegrationTest { 

private int consumersAndPartitionsNumber; 
private final CountDownLatch latch = new CountDownLatch(consumersAndPartitionsNumber); 

@Test 
public void testPartitions() throws Exception { 
    consumersAndPartitionsNumber = Config.getConsumerThreadAmount(); // it's 5 
    KafkaMessageQueue kafkaMessageQueue = new KafkaMessageQueue(); // just a class with Producer configuration 
    String groupId = Config.getGroupId(); 
    List<KafkaConsumer<byte[], byte[]>> consumers = new ArrayList<>(consumersAndPartitionsNumber); 

    for (int i = 0; i < consumersAndPartitionsNumber; i++) { 
     consumers.add(spy(new KafkaConsumer<>(KafkaManager.createKafkaConsumerConfig(groupId)))); 
    } 

    ExecutorService executor = Executors.newFixedThreadPool(consumersAndPartitionsNumber); 
    for (KafkaConsumer<byte[], byte[]> consumer : consumers) { 
     executor.submit(new TestKafkaWorker(consumer)); 
    } 

    for (int i = 0; i < consumersAndPartitionsNumber; i++) { 
     // send messages to topic 
     kafkaMessageQueue.send(new PostMessage("pageid", "channel", "token", "POST", null, "{}")); 
    } 

    latch.await(60, TimeUnit.SECONDS); 

    for (KafkaConsumer<byte[], byte[]> consumer : consumers) { 
     verify(consumer).commitSync(); 
    } 
} 

class TestKafkaWorker implements Runnable { 

    private final KafkaConsumer<byte[], byte[]> consumer; 
    private boolean keepProcessing = true; 

    TestKafkaWorker(KafkaConsumer<byte[], byte[]> consumer) { 
     this.consumer = consumer; 
     consumer.subscribe(Arrays.asList(Config.getTaskProcessingTopic())); 
    } 

    public void run() { 
     while (keepProcessing) { 
      try { 
       ConsumerRecords<byte[], byte[]> records = consumer.poll(100); 
       for (ConsumerRecord<byte[], byte[]> record : records) { 
        consumer.commitSync(); 
        keepProcessing = false; 
        latch.countDown(); 
       } 
      } catch (Exception e) { 
      } 
     } 
    } 
} 
} 
0

您可以使用seekToBeginning()seekToEnd()并计算您为每个分区获得的偏移量的差异。