1
代码启用检查点流:java.io.NotSerializableException在星火下面
def main(args: Array[String]) {
val sc = new SparkContext
val sec = Seconds(3)
val ssc = new StreamingContext(sc, sec)
ssc.checkpoint("./checkpoint")
val rdd = ssc.sparkContext.parallelize(Seq("a","b","c"))
val inputDStream = new ConstantInputDStream(ssc, rdd)
inputDStream.transform(rdd => {
val buf = ListBuffer[String]()
buf += "1"
buf += "2"
buf += "3"
val other_rdd = ssc.sparkContext.parallelize(buf) // create a new rdd
rdd.union(other_rdd)
}).print()
ssc.start()
ssc.awaitTermination()
}
,并抛出异常:
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.spark.streaming.StreamingContext
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected])
- field (class: com.mirrtalk.Test$$anonfun$main$1, name: ssc$1, type: class org.apache.spark.streaming.StreamingContext)
- object (class com.mirrtalk.Test$$anonfun$main$1, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, name: cleanedF$2, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, <function2>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, name: cleanedF$3, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, <function2>)
- field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2)
当我删除代码ssc.checkpoint( “./检查点”),该应用程序可以正常工作,但我需要启用检查点。
如何解决这个问题时启用检查点?
不是问题的事实'StreamingContext'不能序列,而且他用它转化里面? –
@YuvalItzchakov这是我的第一个想法,但它没有用于转换(它只用于流级别),所以它不是直接的问题。看起来这个问题在这里更加微妙,在检查点时,.StreamingContext被拖拽。 – zero323
在驱动程序端或工作端调用'transform'吗? –