我有一个侦听器需要从具有相同主题的多个kafka服务器读取,这些服务器都在一个zookeeper下配置。我如何从这些多台服务器上读取数据。你能帮忙吗?如何使用spring-kafka为侦听器传递多个引导程序服务器
取而代之的是卡夫卡服务器,我可以指向zookeeper吗?
我有一个侦听器需要从具有相同主题的多个kafka服务器读取,这些服务器都在一个zookeeper下配置。我如何从这些多台服务器上读取数据。你能帮忙吗?如何使用spring-kafka为侦听器传递多个引导程序服务器
取而代之的是卡夫卡服务器,我可以指向zookeeper吗?
该@KafkaListener
要求KafkaListenerContainerFactory
@Bean
,而这又是基于ConsumerFactory
。而DefaultKafkaConsumerFactory
接受消费者CONFIGS的Map<String, Object>
:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
...
return props;
}
}
如果这一ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
是完全标准的Apache卡夫卡bootstrap.servers
property:
主机/端口对用于建立名单与Kafka集群的初始连接。客户端将使用所有服务器,而不管在此指定哪些服务器用于引导 - 此列表仅影响用于发现全套服务器的初始主机。此列表应采用host1:port1,host2:port2,...的形式。由于这些服务器仅用于初始连接以发现完整集群成员资格(可能会动态更改),因此此列表不需包含完整集合的服务器(不过,如果服务器停机,您可能需要多个服务器)。
不,您不能指向Zookeeper地址。卡夫卡不再支持这一点。
新的(0.9+)客户端目标是避免客户端代码必须与动物园管理员交谈。但是,您可以(我假设)自己查询zookeeper以获取代理列表并填充引导程序服务器配置属性。 –
谢谢阿尔乔姆。另外我相信Kafka有一个叫做_consumer-offset的话题,用它来处理服务器和客户端之间的偏移量管理。所以如果我使用与多个zookeepers对应的多个服务器,Will spring-kafka会特别处理偏移量管理。 – daemon54
春季卡夫卡不管理偏移量。一切都是在卡夫卡经纪人完成的。我们只提供'consumer.group'并依赖为Consumer实例提供的偏移量。 –