3

下面两个代码片段发布消息的行为有什么不同?春季卡夫卡分区

方法1

Message<String> message = MessageBuilder.withPayload("testmsg") 
     .setHeader(KafkaHeaders.MESSAGE_KEY, "key").setHeader(KafkaHeaders.TOPIC, "test").build(); 

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message); 

方法2

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", "testmsg"); 

主题配置:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test 
Topic:test PartitionCount:3 ReplicationFactor:1 Configs: 
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0 

观察:

如果有3个消费者,每个分区一个;方法1导致单个消费者从单个分区消耗的所有消息。方法2;消费在三个分区/消费者之间平均分配。

回答

4

但是你在代码中有一个答案。 第一个和topic一起提供messageKey

messageKey真的是用来确定目标分区,如果没有明确规定是:

/** 
* computes partition for given record. 
* if the record has partition returns the value otherwise 
* calls configured partitioner class to compute the partition. 
*/ 
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { 
    Integer partition = record.partition(); 
    return partition != null ? 
      partition : 
      partitioner.partition(
        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); 
} 

其中DefaultPartitioner做到这一点:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 
int numPartitions = partitions.size(); 
if (keyBytes == null) { 
    int nextValue = nextValue(topic); 
     ... 
} else { 
    // hash the keyBytes to choose a partition 
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; 
} 

因此,与同key所有邮件都发送到相同的分区。否则,他们被置于主题循环方式。