恢复我现在面临的问题与火花流在这里我想使用广播,mapWithState工作org.apache.spark.util.SerializableConfiguration不能铸造和检查点在火花中。MapWithState给java.lang.ClassCastException:在从检查点
以下是用法:
- 因为我要通过一些连接对象(不序列化)为遗嘱执行人,我使用org.apache.spark.broadcast.Broadcast
- 既然我们要保持我使用状态流与mapWithState一些缓存的信息
- 我也用我的流上下文的检查点
我还需要将广播连接对象传递到mapWithState以从外部源获取一些数据。
当新创建上下文时,流程正常工作。但是,当我崩溃应用程序,并尝试从检查点恢复,我得到一个ClassCastException。
我已经把基于一个example from asyncified.io小的代码片段重现该问题在github:
- 我的广播逻辑是yuvalitzchakov.utils.KafkaWriter.scala
- 应用程序的虚拟逻辑是yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast.scala
的假人片断E码:
我得到当执行
kafkaWriter.value.someMethodCall()
以下错误:
17/08/01 21:20:38 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 4)
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to yuvalitzchakov.utils.KafkaWriter
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserSessions$1(SparkStatefulRunnerWithBroadcast.scala:144)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserEvents(SparkStatefulRunnerWithBroadcast.scala:150)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:78)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:77)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
基本上kafkaWriter是广播变量和kafkaWriter.value应该回到我们广播的变量,但它返回SerializableCongiguration未得到浇铸成所需的对象
预先感谢帮助!
为什么你需要'KafkaWriter'里面'mapWithState'?在更新状态之前是否可以创建呼叫?可能会在'mapPartitions'里面运行的东西呢?顺便说一句,你的例子似乎有一个复制/粘贴错误,因为一些代码被复制两次。 –
感谢您的回复Yuval.This是虚拟的例子,只是为了重现这个问题。在我们真正的用例中,我们必须通过对db用jdbc调用来获取一些数据,我们用它来更新状态。所以我们必须将广播传递给mapWithState。另外,如果您将SparkStateRunner和SparkStateRunnerWithBroadcast称为复制,则前一个没有将广播传递给mapWithState,而后者则具有。 – Saman
我明白了。你有没有考虑在调用'mapWithState'之前调用JDBC驱动程序? –