2017-09-19 28 views

回答

3

@KafkaListener要求KafkaListenerContainerFactory@Bean,而这又是基于ConsumerFactory。而DefaultKafkaConsumerFactory接受消费者CONFIGS的Map<String, Object>

@Configuration 
@EnableKafka 
public class KafkaConfig { 

    @Bean 
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> 
         kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
           new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<Integer, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...); 
     ... 
     return props; 
    } 
} 

https://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/_reference.html#__kafkalistener_annotation

如果这一ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG是完全标准的Apache卡夫卡bootstrap.serversproperty

主机/端口对用于建立名单与Kafka集群的初始连接。客户端将使用所有服务器,而不管在此指定哪些服务器用于引导 - 此列表仅影响用于发现全套服务器的初始主机。此列表应采用host1:port1,host2:port2,...的形式。由于这些服务器仅用于初始连接以发现完整集群成员资格(可能会动态更改),因此此列表不需包含完整集合的服务器(不过,如果服务器停机,您可能需要多个服务器)。

不,您不能指向Zookeeper地址。卡夫卡不再支持这一点。

+1

新的(0.9+)客户端目标是避免客户端代码必须与动物园管理员交谈。但是,您可以(我假设)自己查询zookeeper以获取代理列表并填充引导程序服务器配置属性。 –

+0

谢谢阿尔乔姆。另外我相信Kafka有一个叫做_consumer-offset的话题,用它来处理服务器和客户端之间的偏移量管理。所以如果我使用与多个zookeepers对应的多个服务器,Will spring-kafka会特别处理偏移量管理。 – daemon54

+1

春季卡夫卡不管理偏移量。一切都是在卡夫卡经纪人完成的。我们只提供'consumer.group'并依赖为Consumer实例提供的偏移量。 –

相关问题