0
我想让x个消费者访问kafka中的指定主题,但不会消费相同的消息。我想例如...Kafka消费群集环境抵消
消费1拿起偏移1 消费者2拿起偏移2 消费1拿起偏移3 消费者2拿起偏移4
我想卡夫卡作为一个队列为这两个消费者。我注意到了group.id配置,我认为你可以使用同一个组,并且它会相应地处理它,但它看起来不像我认为的那样工作。
这里是我使用的代码...
public void init(){
Properties props = new Properties();
props.put("bootstrap.servers", kafkaUrl);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "true");
props.put("group.id", "group1");
props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("event1", "event2"));
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
}
public void pollTopics() {
try {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
if(processor != null) {
kafkaThreadPool.execute(processor);
}
}
}catch (Exception e){
LOG.error("Polling exception occurred", e);
}
}
我希望能够在集群环境中运行该代码,并有卡夫卡是队列。我希望它拉动消息并同时进入下一个偏移量,然后下一个kafka轮询将获取下一个偏移量。这可能吗?如果是的话,我在做什么错了?
好吧,这是有道理的,但我读到,如果你有2个分区,你必须有至少2个消费者。那么如果其中一个消费者宕机一个小时会发生什么?其他消费不会拿起这些消息正确吗? –
等等,我想你会说,如果有2个分区,并且只有一个消费者,它会从两个选择?所以只是为了确保我理解正确。如果我创建两个分区并且有两个消费者,它应该解析不同的消息,如果其中一个消失了,那么另一个消费者会从两个分区中接收所有消息?如果这是正确的,你是否有一个示例来订阅特定的分区?如果是的话我会接受你的回答:)。 –
我也刚刚从kafkas网站上读到这段文字。 012f“Kafka中的消费者群体概念概括了这两个概念,与消费者群体一样,消费者群体允许您将一系列流程(消费群体成员)的处理分开。与发布 - 订阅一样,Kafka允许您向多个消费者群体广播消息“。 这表明,我正在努力完成的是可能的。只是不知道该怎么做。 –