2015-06-08 50 views
4

我试图运行下面的简单代码星火:org.apache.spark.SparkException:任务不序列化 - JavaSparkContext

Gson gson = new Gson(); 
JavaRDD<String> stringRdd = jsc.textFile("src/main/resources/META-INF/data/supplier.json"); 

JavaRDD<SupplierDTO> rdd = stringRdd.map(new Function<String, SupplierDTO>() 
{ 
    private static final long serialVersionUID = -78238876849074973L; 

    @Override 
    public SupplierDTO call(String str) throws Exception 
    { 
     return gson.fromJson(str, SupplierDTO.class); 
    } 
}); 

但在执行stringRdd.map声明它抛出以下错误:

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) 
at org.apache.spark.rdd.RDD.map(RDD.scala:288) 
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) 
at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32) 
at com.demo.spark.processor.cassandra.CassandraDataUploader.uploadData(CassandraDataUploader.java:71) 
at com.demo.spark.processor.cassandra.CassandraDataUploader.main(CassandraDataUploader.java:47) 
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
... 7 more 

这里'jsc'是我使用的JavaSparkContext对象。 据我所知,JavaSparkContext不是一个Serializable对象,不应该在将被发送给Spark工作者的任何函数中使用它。

现在,我无法理解的是,JavaSparkContext的实例如何被发送给工人?我应该更改我的代码以避免这种情况?

+0

你确定这是所有的登录?我认为这是Gson导致的问题,因为我之前遇到过同样的问题。 – nhahtdh

+0

这是完整的日志。 –

+0

你可以在这里粘贴SupplierDTO类吗? – Sathish

回答

5

gson引用将外部类'拉'到闭包的范围中,并将其全部对象图与它一起使用。

在这种情况下,封闭内创建GSON对象:

public SupplierDTO call(String str) throws Exception { 
    Gson gson = Gson(); 
    return gson.fromJson(str, SupplierDTO.class); 
} 

还可以声明火花背景transient

如果GSON实例的创建是昂贵的,可以考虑使用mapPartitions代替map

+1

在调用方法内创建Gson对象没有任何区别。您提到的其他两个选项都没有。请帮忙。 –

+0

@ArkaGhosh添加'transient'到'sparkContext'的声明应该 - 至少 - 改变抛出的异常。 – maasg

+0

你能否提一下'transient'注释的完整分类名称。 –

3

对我来说,我下定决心使用下列选项之一这个问题:

  1. 如上所述,通过声明SparkContext作为transient
  2. 您也可以尝试使对象GSON静态 static Gson gson = new Gson();

请参阅文档Job aborted due to stage failure: Task not serializable

t Ø看其他可用的选择来解决这个probleme

+0

我也有同样的问题。我将我的上下文引用标记为短暂的。我工作 – BDR

+0

静态Gson工作正常 –

0

您可以使用下面的代码,而不是9行(return gson.fromJson(str, SupplierDTO.class);

return new Gson().fromJson(str, SupplierDTO.class);//this is correct 

,并删除第1行(Gson gson = new Gson();

相关问题