2017-05-22 29 views
-1

我得到这个例外 星火版本:2.0.2空指针异常火花JavaPairRDD:(JavaPairRDD.scala:1028)

17/05/22 13:47:30 ERROR Executor: Exception in task 0.3 in stage 28.0 (TID 33) 
java.lang.NullPointerException 
    at com.baesystems.ai.analytics.util.RDDUtil.decideBin(RDDUtil.java:47) 
    at com.baesystems.ai.analytics.util.RDDUtil.access$400(RDDUtil.java:19) 
    at com.baesystems.ai.analytics.util.RDDUtil$1.call(RDDUtil.java:129) 
    at com.baesystems.ai.analytics.util.RDDUtil$1.call(RDDUtil.java:102) 
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1028) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1765) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
17/05/22 13:47:30 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 
17/05/22 13:47:30 INFO MemoryStore: MemoryStore cleared 
ver commanded a shutdown 

请指引我,什么是怎么回事。此代码在我的IDE(Intellj)中正常工作;只有当我尝试以独立模式在Spark群集上运行此代码时,才会出现此问题。


下面是实际的代码:

import java.io.Serializable; 
import java.util.HashMap; 
import java.util.Map; 

import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.mllib.linalg.Vector; 
import org.apache.spark.mllib.regression.LabeledPoint; 
import org.apache.spark.rdd.RDD; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 


public class RDDUtil implements Serializable 
{ 

    /** 
    * 
    */ 
    private static final long serialVersionUID = 1914105980520925932L; 
    private static final Logger log = LoggerFactory.getLogger(RDDUtil.class); 

    public static Map<String, java.util.HashMap<String, Integer>> histoMap = new java.util.HashMap<String, java.util.HashMap<String, Integer>>(); 
    private static HashMap<String, Integer> histogram0; 
    private static java.util.HashMap<String, Integer> histogram1; 
    private static java.util.HashMap<String, Integer> histogramBase; 
    private static int minValue=0; 
    private static int maxValue=0; 

    public static int getMinValue() { 
     return minValue; 
    } 

    public static int getMaxValue() { 
     return maxValue; 
    } 

    private static void decideBin(Double label, Double bin) 
    { 
     int vCount=0; 
     log.error("this value of bin is {} and the label is {}",bin,label); 
     histogramBase.put(bin.toString(), 0); 
     if(label==0.0) { 
      assignZero(histogram1, bin.toString()); 
      if(!checkIfPresent(histogram0,bin.toString())) { 
       vCount++; 
       histogram0.put(bin.toString(),vCount); 
      } 
     } 
     else { 
      assignZero(histogram0, bin.toString()); 

      if(!checkIfPresent(histogram1,bin.toString())) { 
       vCount++; 
       histogram1.put(bin.toString(),vCount); 
      } 
     } 
    } 

    private static boolean checkIfPresent(java.util.HashMap<String, Integer> histogram, String bin) 
    { 

     if(histogram.containsKey(bin)) { 

      int value = histogram.get(bin); 

      value++; 
      histogram.put(bin, value); 
      return true;  
     } 
     else return false; 
    } 

    private static void assignZero(java.util.HashMap<String, Integer> histogram, String bin) 
    { 
     if(!histogram.containsKey(bin)) { 
      histogram.put(bin, 0); 
     } 
    } 
    private static void calculateMin(Double bin) 
    { 
     int tempValue = bin.intValue(); 
     if(minValue>tempValue) minValue=tempValue; 

    } 
    private static void calculateMax(Double bin) 
    { 
     int tempValue = bin.intValue(); 
     if(tempValue>maxValue) maxValue=tempValue; 

    } 

    private static JavaRDD<Map<String, HashMap<String, Integer>>> getJavaRDDMap(RDD<LabeledPoint> rdd,int i) 
    { 
     long val = rdd.count(); 

     return rdd.toJavaRDD().map(new Function<LabeledPoint,Map<String, HashMap<String, Integer>>>() { 


      private static final long serialVersionUID = -7619643561853615982L; 

      @Override 
      public Map<String, HashMap<String, Integer>> call(LabeledPoint p) { 


       Double label = (Double)p.productElement(0); 
       Vector v = (Vector)p.productElement(1); 

       p.features(); 
       Double bin =0.0; 
       int vSize =p.features().size(); 
       if(i<vSize) { 
        bin = v.apply(i); 
       } 
       else { 
        bin = v.apply(vSize-1); 
       } 

       minValue=bin.intValue(); 
       calculateMin(bin); 
       calculateMax(bin); 

       log.error("this value of bin is {} and the iteration is {}",bin , i); 
       decideBin(label,bin); 

       histoMap.put("0.0", histogram0); 
       histoMap.put("1.0", histogram1); 
       histoMap.put("@@@[email protected]@@", histogramBase); 

       return histoMap; 
      } 
     }); 
    } 



    public static JavaRDD<Map<String, HashMap<String, Integer>>> computeHistogram(RDD<LabeledPoint> Data, int i) 
    { 


     histogram0 = new java.util.HashMap<String, Integer>(); 
     histogram1 = new java.util.HashMap<String, Integer>(); 
     histogramBase = new java.util.HashMap<String, Integer>(); 
     maxValue=0; 

     JavaRDD<Map<String, HashMap<String, Integer>>> jRdd = getJavaRDDMap(Data,i); 
     return jRdd; 
    } 

} 
+0

这是我的代码: – amina

回答

0

快速建议:尝试提交火花之前重新编译罐子。

你能否提供更多关于你如何提交spark工作的细节?

比较你提供的代码和堆栈跟踪,看起来行数不对齐。例如,RDDUtil行19是注释字符串(“* /”),129是空字符串,102是“覆盖”。提交火花作业之前,你是否尝试过重新编译你的jar?不匹配的行号以及代码似乎在IntelliJ中工作的事实可能是因为在Spark集群中运行的代码与在自己的计算机上运行的代码不同。

我对SO不是特别熟悉,但我相信有一种方法可以编辑您的原始文章。如果您要向其添加信息(如您的代码),该选项通常是首选。