2017-05-07 26 views
4

我想访问RDD中转换内的伴随对象的方法。为什么下面不工作:为什么这个Spark代码抛出java.io.NotSerializableException

import org.apache.spark.rdd.RDD 
import spark.implicits._ 
import org.apache.spark.sql.{Encoder, Encoders} 

class Abc { 
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) } 
} 

object Abc { 
    def fn(x: Int): Double = { x.toDouble } 
} 

implicit def abcEncoder: Encoder[Abc] = Encoders.kryo[Abc] 

new Abc().transform(sc.parallelize(1 to 10)).collect 

上面这段代码抛出一个java.io.NotSerializableException

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) 
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.RDD.map(RDD.scala:369) 
    at Abc.transform(<console>:19) 
    ... 47 elided 
Caused by: java.io.NotSerializableException: Abc 
Serialization stack: 
     - object not serializable (class: Abc, value: [email protected]) 
     - field (class: Abc$$anonfun$transform$1, name: $outer, type: class Abc) 
     - object (class Abc$$anonfun$transform$1, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) 
    ... 57 more 

即使定义Encoder为ABC类并不在这里帮助。但更重要的问题是,为什么Abc类的对象的系列化被尝试呢?我的第一个想法是伴侣对象是类的单例对象,所以也许会尝试序列化它。但似乎并不喜欢这样,因为当我从另一个类调用Abc.fn:

class Xyz { 
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) } 
} 

implicit def xyzEncoder: Encoder[Xyz] = Encoders.kryo[Xyz] 

new Xyz().transform(sc.parallelize(1 to 10)).collect 

我得到一个java.io.NotSerializableException: Xyz

+2

工作不会发生在边缘节点上;类(或对象)必须被序列化,以便数据节点可以运行它。 –

+0

因为你实际上并没有真正定义serialise/deserialise函数,即使实现适当的接口也没有那么简单? (https://docs.oracle.com/javase/7/docs/api/java/io/NotSerializableException.html)默认情况下,sereilsiation只能访问公共设置和可获取的内容。除此之外,您需要提供自己的功能。 – Christopher

回答

2

主要抽象OS火花RDD这是跨节点分区集群。所以当我们运行RDD时,它在驱动程序节点中被序列化,并且被分配给其他适当的节点。然后工作节点将其反序列化并执行。

在你的情况下,类ABC不能被序列化并分发给其他工作节点。 您需要序列化序列化ABC类

class Abc with Serializable{ 
    def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) } 
} 
相关问题