2016-11-28 38 views
1

以下是在尝试将作业分派给执行程序时导致java.io.NotSerializableException的代码。在映射JavaRDD时获取java.io.NotSerializableException

JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD(); 
    JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() { 

     /** 
     * Serial Version Id 
     */ 
     private static final long serialVersionUID = 6766320395808127072L; 

     @Override 
     public String call(Row row) throws Exception { 
      return row.mkString(dataFormat.getDelimiter()); 
     } 
    }); 

然而,当我这样做,任务成功连载:

JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD(); 
List<String> dataList = rddToWrite.collect().stream().parallel() 
          .map(row -> row.mkString(dataFormat.getDelimiter())) 
          .collect(Collectors.<String>toList()); 
JavaSparkContext javaSparkContext = new JavaSparkContext(sessionContext.getSparkContext()); 
JavaRDD<String> stringRDD = javaSparkContext.parallelize(dataList); 

任何人都可以请帮我指出我在做什么错在这里?

编辑: dataFormat是包含此代码的函数写入的类中的私有成员字段。它是DataFormat类的一个对象,它定义了两个字段,即spark数据格式(例如“com.databricks.spark.csv”)和分隔符(例如“\ t”)。

+0

什么'dataFormat'? –

+0

'dataFormat'是一个局部变量还是包含类的字段? –

回答

3

通过new Function ...创建的匿名类需要包封实例的引用,和序列化的功能需要串行化外围实例,包括dataFormat所有其他字段。如果该类未标记为Serializable,或者具有任何不可序列化的非transient字段,则该字段不起作用。即使它确实如此,它却默默无闻地表现得更糟。

不幸的是,要完全解决这个,你需要创建一个名为静态内部类(或只是一个单独的类),它甚至可以不是本地的(因为无论匿名还是local classes in Java可以是静态的):

static class MyFunction extends Function<Row, String> { 
    private String delimiter; 
    private static final long serialVersionUID = 6766320395808127072L; 

    MyFunction(String delimiter) { 
     this.delimiter = delimiter; 
    } 

    @Override 
    public String call(Row row) throws Exception { 
     return row.mkString(delimiter); 
    } 
} 

然后

JavaRDD<String> stringRdd = rddToWrite.map(new MyFunction(dataFormat.getDelimiter())); 
+0

谢谢!这解决了它:) – ologn13

3

当您访问dataFormat,这意味着this.dataFormat。 所以火花会尝试序列化this并遇到NotSerializableException

设法让像一个本地副本:

DataFormat dataformat = this.dataformat; 
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD(); 
JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() ... 

欲了解更多信息,请参阅 http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark

+1

至少在一个快速测试中,即使是一个匿名类,它也不会访问封装实例的任何方法或字段,但仍然会引用它,所以会尝试序列化它。也许我做错了,虽然... –

+0

Alexey是对的!这仍然是序列化封闭的实例。造成同样的问题。 – ologn13

+0

是的,他是对的。我对java中的匿名函数有一些误解。 –