我遇到以下异常: 线程“main”中的异常java.io.NotSerializableException:DStream检查点已启用,但DStreams的功能是不可序列化NotSerializableException:已启用DStream检查点设置,但DStreams及其函数不可序列化
我已经启用了外部checkpoting,并使用这个类的过程。 ,它说,这个类不serializablbe:
class EventhubsStateTransformComponent(inStream: DStream[EventhubsEvent]) extends PipelineComponent with Logging{
def process() = {
inStream.foreachRDD(rdd => {
if (rdd.isEmpty()) {
logInfo("Extract outstream is empty...")
} else {
logInfo("Extract outstream is not empty...")
}
})
// TODO eventhubsId is hardcode
val eventhubsId = "1"
val statePairStream = inStream.map(eventhubsEvent => ((eventhubsId, eventhubsEvent.partitionId), eventhubsEvent.eventOffset))
val eventhubsEventStateStream = statePairStream.mapWithState(StateSpec.function(EventhubsStreamState.updateStateFunc _))
val snapshotStateStream = eventhubsEventStateStream.stateSnapshots()
val out = snapshotStateStream.map(state => {
(state._1._1, state._1._2, state._2, System.currentTimeMillis()/1000)
})
outStream = out
}
}
P.S EventhubsEvent是一个案例类。
============================================== =========
新编辑:在我让这个类扩展Serialzable之后,异常消失了。但是我不知道为什么我们需要让自己的类扩展Serializable。这是否意味着如果一个类有foreachRDD操作,它会触发检查点来验证代码,并且它需要包含foreachRDD操作的整个对象是可序列化的?因为在我的记忆中,有些情况只需要foreachRDD范围中的对象需要可序列化。
Serialization stack:
- object not serializable (class: com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent, value: com.testdm.spark[email protected]2a92a7fd)
- field (class: com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent$$anonfun$process$1, name: $outer, type: class com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent)
- object (class com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent$$anonfun$process$1, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream, [email protected])
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer([email protected], [email protected]))
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.PluggableInputDStream, [email protected]4)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData
//....
“EventHub”是否包含可能不可序列化的字段? –
@YuvalItzchakov嗨,你的意思是EventhubsEvent?它是一个具有一些基本类型和scala选项的嵌套案例类,例如Option [String],Option [] –
您是否深入了解StackTrace? Spark告诉你究竟是哪个字段导致问题 –