2016-09-15 110 views
0

为了增加并行的星火流编程指南中建议我设置了多个接收器,并试图联合它们的列表水槽接收器的列表。此代码按预期工作:联盟星火流

private JavaDStream<SparkFlumeEvent> getEventsWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) { 

     List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>(); 

     for (String host : hosts) { 
      for (String port : ports) { 
       receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port))); 
      } 
     } 

     JavaDStream<SparkFlumeEvent> unionStreams = receivers.get(0) 
       .union(receivers.get(1)) 
       .union(receivers.get(2)) 
       .union(receivers.get(3)) 
       .union(receivers.get(4)) 
       .union(receivers.get(5)); 

     return unionStreams; 
    } 

但我实际上并不知道我的集群在运行时会有多少个接收器。当我尝试在循环中做到这一点时,我得到了一个N​​PE。

private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) { 

     List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>(); 

     for (String host : hosts) { 
      for (String port : ports) { 
       receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port))); 
      } 
     } 

     JavaDStream<SparkFlumeEvent> unionStreams = null; 
     for (JavaReceiverInputDStream<SparkFlumeEvent> receiver : receivers) { 
      if (unionStreams == null) { 
       unionStreams = receiver; 
      } else { 
       unionStreams.union(receiver); 
      } 
     } 

     return unionStreams; 
    } 

错误:

16/09/15 17:05:25 ERROR JobScheduler: Error in job generator java.lang.NullPointerException at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 16/09/15 17:05:25 INFO MemoryStore: ensureFreeSpace(15128) called with curMem=520144, maxMem=555755765 16/09/15 17:05:25 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 14.8 KB, free 529.5 MB) Exception in thread "main" java.lang.NullPointerException at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

什么是做到这一点的正确方法是什么?

+0

我也试过JavaDStream unionStreams = jssc.union(receivers.get(0),receivers.subList (1,receivers.size()));但不能编译。错误:(118,57)的Java:找到工会没有合适的方法(...编译时 – Kevin

回答

0

能否请你尝试下面的代码,这将解决您的问题:

private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) { 

    List<JavaDStream<SparkFlumeEvent>> receivers = new ArrayList<JavaDStream<SparkFlumeEvent>>(); 

    for (String host : hosts) { 
     for (String port : ports) { 
      receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port))); 
     } 
    } 

    return jssc.union(receivers.get(0), receivers.subList(1, receivers.size()));; 
} 
+0

得到这个错误: “错误:(133,85)的java:可达声明” 的IntelliJ抱怨:“不兼容平等约束:SparkFlumeEvent和T” – Kevin

+0

忽略不可达语句错误,这是由于在代码中;;错字,我没有注意到,但仍的IntelliJ给这个错误。它成功地建立,但并似乎工作。任何想法,为什么是的IntelliJ complaning? – Kevin

+0

这是用的IntelliJ缓存的问题: - /非常令人沮丧按照这里的意见解决:http://stackoverflow.com/questions/15052772/intellij-show-errors-in-scala-source-files-but-the -project-compiles-successfully。感谢您的帮助。 – Kevin