2017-07-13 67 views
0

提供了一个有n个分区的主题。 Spring的KafkaConsumer侦听器可以通过哪种方式一次侦听来自同一个/一个分区的多个消息?单个Spring的KafkaConsumer侦听器可以侦听来自同一个/一个分区的多条消息吗?

我试过ConcurrentKafkaListenerContainerFactory通过设置setBatchListener(true);但消费者已经开始消费来自不同分区而不是一个分区的多个消息。

public class BatchReceiverConfig { 

    @Value("${kafka.bootstrap-servers}") 
    private String bootstrapServers; 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 

     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch4"); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     // maximum records per batch receive 
     props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

     return props; 
    } 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, String> factory = 
       new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(2); 
     // enable batch listeners 
//  factory.setBatchListener(true); 

     return factory; 
    } 

    @Bean 
    public BatchReceiver receiver() { 
     return new BatchReceiver(); 
    } 
} 


/* Listener */ 
@KafkaListener(id = "batch-listener", topics = TOPIC_TEST_BATCH) 
    public void receive(List<String> data, 
         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, 
         @Header(KafkaHeaders.OFFSET) List<Long> offsets) { 

     LOGGER.info("start of batch receive"); 
     for (int i = 0; i < data.size(); i++) { 
      LOGGER.info("received message='{}' with partition-offset='{}'", data.get(i), 
        partitions.get(i) + "-" + offsets.get(i)); 
      // handle message 

      latch.countDown(); 
     } 
     LOGGER.info("end of batch receive"); 
    } 
+0

重组你的问题:“怎么能一个侦听器只从一个分区消耗的消息?”。那是对的吗? – Arek

+0

这是正确的,但在春季卡夫卡模块的上下文中。 – Rites

回答

0

只要将只有一个组中的用户,那么你可以设置该ConcurrentKafkaListenernconcurrencyLevel

+0

它没有工作,我相信'max.poll.records'是决定提取记录数量的属性。 – Rites

+0

演示如何配置Spring Kafka和Kafka消费者本身。 – Arek

+0

本身已添加片段。 – Rites

0

如果使用组管理,卡夫卡将分配分区的消费者。每个使用者(或使用并发时的线程)将获得0个,1个或多个分区,具体取决于有多少个使用者和分区。

如果你想要一个特定的消费者从只有一个分区得到的消息,你必须指定分区的自己,而不是让卡夫卡做作业。

如果使用并发容器;您将不得不分配与并发相同数量的分区,因此每个线程只能获得一个分区。

+0

在问题中发布的代码片段中,分区数量=并发性,即使它不起作用。另外,消费者数量=分区数量。 – Rites

+0

如果您正在使用组管理,分区分布将需要时间;最初,消费者将获得他们全部;唯一的保证是自己分配分区,而不是使用组管理。 'props.put(ConsumerConfig.GROUP_ID_CONFIG,“batch4”);' –

+0

在集群环境中,很难将其定义为相同的代码部署到所有节点。有没有办法动态配置它? – Rites

相关问题