2016-08-25 13 views
1

我遇到以下异常: 线程“main”中的异常java.io.NotSerializableException:DStream检查点已启用,但DStreams的功能是不可序列化NotSerializableException:已启用DStream检查点设置,但DStreams及其函数不可序列化

我已经启用了外部checkpoting,并使用这个类的过程。 ,它说,这个类不serializablbe:

class EventhubsStateTransformComponent(inStream: DStream[EventhubsEvent]) extends PipelineComponent with Logging{ 
    def process() = { 
     inStream.foreachRDD(rdd => { 
      if (rdd.isEmpty()) { 
       logInfo("Extract outstream is empty...") 
      } else { 
       logInfo("Extract outstream is not empty...") 
      } 
     }) 
     // TODO eventhubsId is hardcode 
     val eventhubsId = "1" 
     val statePairStream = inStream.map(eventhubsEvent => ((eventhubsId, eventhubsEvent.partitionId), eventhubsEvent.eventOffset)) 
     val eventhubsEventStateStream = statePairStream.mapWithState(StateSpec.function(EventhubsStreamState.updateStateFunc _)) 
     val snapshotStateStream = eventhubsEventStateStream.stateSnapshots() 
     val out = snapshotStateStream.map(state => { 
      (state._1._1, state._1._2, state._2, System.currentTimeMillis()/1000) 
     }) 
     outStream = out 
    } 
} 

P.S EventhubsEvent是一个案例类。

============================================== =========

新编辑:在我让这个类扩展Serialzable之后,异常消失了。但是我不知道为什么我们需要让自己的类扩展Serializable。这是否意味着如果一个类有foreachRDD操作,它会触发检查点来验证代码,并且它需要包含foreachRDD操作的整个对象是可序列化的?因为在我的记忆中,有些情况只需要foreachRDD范围中的对象需要可序列化。

Serialization stack: 
    - object not serializable (class: com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent, value: com.testdm.spark[email protected]2a92a7fd) 
    - field (class: com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent$$anonfun$process$1, name: $outer, type: class com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent) 
    - object (class com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent$$anonfun$process$1, <function1>) 
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) 
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStream) 
    - object (class org.apache.spark.streaming.dstream.ForEachDStream, [email protected]) 
    - element of array (index: 0) 
    - array (class [Ljava.lang.Object;, size 16) 
    - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) 
    - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer([email protected], [email protected])) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) 
    - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 
0 checkpoint files]) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStream) 
    - object (class org.apache.spark.streaming.dstream.PluggableInputDStream, [email protected]4) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) 
    - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData 

    //.... 
+0

“EventHub”是否包含可能不可序列化的字段? –

+0

@YuvalItzchakov嗨,你的意思是EventhubsEvent?它是一个具有一些基本类型和scala选项的嵌套案例类,例如Option [String],Option [] –

+0

您是否深入了解StackTrace? Spark告诉你究竟是哪个字段导致问题 –

回答

0

从序列化栈:

  • 对象不是可序列化(等级:com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent,值:com.testdm.spark.streaming.etl .common.pipeline.EventhubsStateTransformComponent @ 2a92a7fd)
  • 字段(等级:com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent $$ anonfun $过程$ 1,名称:$外,类型:类COM。 testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent)
  • 对象(类com.testdm.spark.streaming.etl.common.pipeline.EventhubsStateTransformComponent $$ anonfun $过程$ 1,)

name的显示哪些对象是不序列化,所以outer是场你应该检查你使用它的地方。 某些对象不可序列化,并尝试在驱动程序或执行程序中使用它,但不会将它从驱动程序传递到在执行程序中执行的其他功能。