2014-12-20 32 views
7

我在Spark上遇到了一个关于序列化的非常奇怪的问题。 的代码如下:火花上的序列化异常

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable 
{ 
    def infer(document: RDD[Document]): RDD[DocumentParameter] = { 
     val docs = documents.map(doc => DocumentParameter(doc, numOfTopics)) 
     docs 
    } 
} 

其中文档被定义为:

class Document(val tokens: SparseVector[Int]) extends Serializable 

和DocumentParameter是:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable 

object DocumentParameter extends Serializable 
{ 
    def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, 
    Array.ofDim[Float](numOfTopics)) 
} 

SparseVectoris在breeze.linalg.SparseVector序列化类。

这是一个简单的地图程序,所有的类都是可序列化,但我得到这个异常:

org.apache.spark.SparkException: Task not serializable 

但是当我删除numOfTopics参数,即:

object DocumentParameter extends Serializable 
{ 
    def apply(document: Document) = new DocumentParameter(document, 
    Array.ofDim[Float](10)) 
} 

和这样称呼它:

val docs = documents.map(DocumentParameter.apply) 

它似乎还可以。

类型Int不可序列化吗?但我确实看到有些代码是这样写的。

我不知道如何解决这个错误。

#更新时间:#

谢谢@samthebest。我会添加更多关于它的细节。

stack trace: 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) 
    at org.apache.spark.rdd.RDD.map(RDD.scala:270) 
    at com.topicmodel.PLSA.infer(PLSA.scala:13) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37) 
    at $iwC$$iwC$$iwC.<init>(<console>:39) 
    at $iwC$$iwC.<init>(<console>:41) 
    at $iwC.<init>(<console>:43) 
    at <init>(<console>:45) 
    at .<init>(<console>:49) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) 
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) 
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) 
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
    ... 46 more 

由于堆栈跟踪给出异常的一般信息,我将其删除。

我在spark-shell中运行代码。

// suppose I have get RDD[Document] for docs 
val numOfTopics = 100 
val plsa = new PLSA(sc, numOfTopics) 
val docPara = plsa.infer(docs) 

你可以给我一些关于可串行化的教程或技巧吗?

回答

10

匿名函数序列化它们包含的类。当你map {doc => DocumentParameter(doc, numOfTopics)},它可以让该函数访问numOfTopics的唯一方法是序列化PLSA类。并且该类实际上不能被序列化,因为(从堆栈跟踪中可以看到)它包含不可序列化的SparkContext(如果单个群集节点可以访问上下文并且可以创建新作业在映射器内)。

一般来说,尽量避免将SparkContext存储在你的类中(编辑:或者至少要确保它非常清楚哪些类包含SparkContext以及哪种类型不包含)。最好将它作为(可能是implicit)参数传递给需要它的单个方法。或者,将函数{doc => DocumentParameter(doc, numOfTopics)}转换为与PLSA不同的类,这是一个真正可以序列化的类。

(由于多人建议,可以将SparkContext保留在课堂上,但标记为@transient,这样它就不会被序列化了,我不推荐这种方法;这意味着课程会“神奇地”改变(丢失了SparkContext),所以当你尝试从序列化作业中访问SparkContext时,你最终可能会遇到NPE。最好是保持只在“控制”代码中使用的类之间的明显区别(并可能使用SparkContext)和序列化的类在群集上运行(它不能有SparkContext))。

+2

谢谢。它按照你的建议工作。另外,我找到另一种解决这个问题的方法:在'val sc:SparkContext'之前加'@ transient',然后''''''''''''''''''不会被序列化。 – superhan

+0

我不同意你应该完全避免在你的类中存储'SparkContext'(但是仍然是upvoted)。如果你不把它们存储在范围内,那么你最终可能会得到膨胀参数(即使在使用隐式参数时也很丑陋)。唯一的选择是在某个地方粘上一些全局单例,从而产生它自己的问题(可怕的空指针)。 – samthebest

0

这确实是一个奇怪的,但我想我可以猜测问题。但首先,你没有提供解决问题的最低限度(我可以猜测,因为我以前见过这些数字是100)。这里有一些问题,你的问题:

def infer(document: RDD[Document], numOfTopics: Int): RDD[DocumentParameter] = { 
    val docs = documents.map(doc => DocumentParameter(doc, numOfTopics)) 
} 

此方法不返回RDD[DocumentParameter]返回Unit。您必须错误地复制和粘贴代码。

其次你还没有提供整个堆栈跟踪?为什么?没有理由不提供完整的堆栈跟踪,并且带有消息的完整堆栈跟踪对于理解错误是必要的 - 需要整个错误来理解错误是什么。通常不可序列化的异常告诉你什么不可序列化。

第三,你还没有告诉我们方法infer在哪里,你是在做一个shell吗?什么是infer的包含对象/类/特征等?

无论如何,我猜想通过传递Int你导致一连串的事情被序列化,你不期望,我不能给你任何更多的信息,直到你提供最低的代码,所以我们可以完全理解你的问题。