我在Spark上遇到了一个关于序列化的非常奇怪的问题。 的代码如下:火花上的序列化异常
class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
def infer(document: RDD[Document]): RDD[DocumentParameter] = {
val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
docs
}
}
其中文档被定义为:
class Document(val tokens: SparseVector[Int]) extends Serializable
和DocumentParameter是:
class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable
object DocumentParameter extends Serializable
{
def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document,
Array.ofDim[Float](numOfTopics))
}
SparseVectoris在breeze.linalg.SparseVector
序列化类。
这是一个简单的地图程序,所有的类都是可序列化,但我得到这个异常:
org.apache.spark.SparkException: Task not serializable
但是当我删除numOfTopics
参数,即:
object DocumentParameter extends Serializable
{
def apply(document: Document) = new DocumentParameter(document,
Array.ofDim[Float](10))
}
和这样称呼它:
val docs = documents.map(DocumentParameter.apply)
它似乎还可以。
类型Int不可序列化吗?但我确实看到有些代码是这样写的。
我不知道如何解决这个错误。
#更新时间:#:
谢谢@samthebest。我会添加更多关于它的细节。
stack trace:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at com.topicmodel.PLSA.infer(PLSA.scala:13)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC.<init>(<console>:41)
at $iwC.<init>(<console>:43)
at <init>(<console>:45)
at .<init>(<console>:49)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 46 more
由于堆栈跟踪给出异常的一般信息,我将其删除。
我在spark-shell中运行代码。
// suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)
你可以给我一些关于可串行化的教程或技巧吗?
谢谢。它按照你的建议工作。另外,我找到另一种解决这个问题的方法:在'val sc:SparkContext'之前加'@ transient',然后''''''''''''''''''不会被序列化。 – superhan
我不同意你应该完全避免在你的类中存储'SparkContext'(但是仍然是upvoted)。如果你不把它们存储在范围内,那么你最终可能会得到膨胀参数(即使在使用隐式参数时也很丑陋)。唯一的选择是在某个地方粘上一些全局单例,从而产生它自己的问题(可怕的空指针)。 – samthebest