2017-10-28 42 views
1

我试图启动一个卡夫卡服务器形式的Java创建`KafkaServer`从Java

具体来说,我怎么能翻译斯卡拉的this line入行渣华的?

private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters) 

我可以轻松地创建serverConfig中,但我似乎无法能够创建kafkaMetricsReporters参数。

注:我可以创建一个KafkaServerStartable,但我想创建一个正常的KafkaServer以避免JVM在发生错误时退出。

Apache的版本卡夫卡0.11.0.1

回答

1

kafkaMetricsReporters参数是斯卡拉Seq

您可以:

  1. 创建一个Java集合并将其转换成一个序列:

    您需要导入scala.collection.JavaConverters

    List<KafkaMetricsReporter> reportersList = new ArrayList<>(); 
    ... 
    Seq<KafkaMetricsReporter> reportersSeq = JavaConverters.asScalaBufferConverter(reportersList).asScala(); 
    
  2. 使用KafkaMetricsReporter.startReporters()方法来创建它们对于您的配置:

    作为KafkaMetricsReporter是单身,你需要使用MODULE符号来使用它:

    Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props)); 
    

另外,KafkaServer构造有从Java调用它时需要2个其他参数:

  • time可以很容易地使用new org.apache.kafka.common.utils.SystemTime()
  • threadNamePrefix是一个选项。如果导入scala.Option,你就可以调用Option.apply("prefix")

全部放在一起:

Properties props = new Properties(); 
props.put(...); 
KafkaConfig config = KafkaConfig.fromProps(props); 
Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter$.MODULE$.startReporters(new VerifiableProperties(props)); 
KafkaServer server = new KafkaServer(config, new SystemTime(), Option.apply("prefix"), reporters); 
server.startup(); 
+0

谢谢:) 两个问题:你的意思是'reportersList',而不是'reporters'? 'new SystemTime()'来自哪里(哪个包)? – nha

+0

另外,这意味着不会有记者,对吗? (目前我很好,虽然我不得不在某些时候创建一些) – nha

+1

我已经更新了我的答案。 'startReporters()'可能是你想用的 –