2017-08-21 232 views
0

我正在寻找在我的应用程序中的kafka主题上设置多个监听器。以下是我的设置。它应该被两个组消费,但它只被一个听众消费。我在这里错过了什么?多位消费者使用spring kafka

@Bean 
public Map<String, Object> consumerConfigs() { 
    Map<String, Object> props = new HashMap<String, Object>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName); 

    return props; 
} 

@Bean 
public ConsumerFactory<String, String> consumerFactory() { 
    ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory(consumerConfigs()); 
    return consumerFactory; 
} 

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(100); 
    factory.setConsumerFactory(consumerFactory()); 
    return factory; 
} 

@Bean("notificationFactory") 
public ConcurrentKafkaListenerContainerFactory<String, String> notificationFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(100); 
    factory.setConsumerFactory(consumerFactory()); 
    return factory; 
} 

@Bean("insertContainerFactory") 
public ConcurrentKafkaListenerContainerFactory<String, String> insertContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(100); 

    factory.setConsumerFactory(consumerFactory()); 
    return factory; 
} 

@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", group = "insert_listener", containerFactory = "insertContainerFactory") 
public void receiveForInsert(String message) { 
    locationProcessor.insertLocationData(message); 
} 

@KafkaListener(id = "notification_listener", topics = "${kafka.topic.readlocation}", group = "notification_listener",containerFactory="notificationFactory") 
public void receiveForNotification(String message) { 
    locationProcessor.processNotificationMessage(message); 
} 

编辑:下面是工作

@KafkaListener(id = "insert_listener", topics = "${kafka.topic.readlocation}", groupId = "insert_listener") 
public void receiveForInsert(String message) { 
    locationProcessor.insertLocationData(message); 
} 

回答

0

你需要为每个不同的group.id代码; group属性不是group.id - 请参阅javadocs。在即将发布的1.3版本中,有一个新的groupId属性,我们也可以使用id作为一个组(如果存在)。

对于较早的版本,您需要针对每个消费者工厂。

+0

完美的工作。谢谢。我升级到1.3版本并使用group.id。 –

相关问题