2016-06-29 218 views
0

我在Apache Kafka上构建了排队系统。应用程序将产生消息到特定的Kafka topic,并且在消费者端,我必须消耗为该主题产生的所有记录。
我写消费者使用新的Java Consumer Api。 代码看起来像永久运行kafka消费者(新消费者API)

Properties props = new Properties(); 
        props.put("bootstrap.servers", kafkaBrokerIp+":9092"); 
        props.put("group.id",groupId); 
        props.put("enable.auto.commit", "true"); 
        props.put("session.timeout.ms", "30000"); 
        props.put("auto.offset.reset", "earliest"); 
     props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer(props); 
        consumer.subscribe(Arrays.asList("consumertest")); 
        while (true) { 
         ConsumerRecords<String, String> records = consumer.poll(100); 
         for (ConsumerRecord<String, String> record : records){ 
          System.out.println("Data recieved : "+record.value()); 
          } 
        } 

在这里,我要永远跑消费者,这样的纪录推到卡夫卡的话题生产者应立即使用并处理。
所以我的困惑是,它是一个正确的方式来使用无限的while循环(如在示例代码中)来消费数据?

回答

0

它适用于我,但您可能想要将您的内部循环放入try/catch块以防万一您抛出任何异常。如果断开连接,请考虑定期重新连接任务。

0

是的,你可以使用无限循环。其实,这不是一个繁忙的循环。在每次轮询期间,如果数据不可用,呼叫会等待给定的时间。

long millisToWait = 100; 
consumer.poll(millisToWait); 

新客户自动处理网络通信问题。确保在关机时消费者优雅地关闭。