3

恢复我现在面临的问题与火花流在这里我想使用广播mapWithState工作org.apache.spark.util.SerializableConfiguration不能铸造和检查点在火花中。MapWithState给java.lang.ClassCastException:在从检查点

以下是用法:

  • 因为我要通过一些连接对象(不序列化)为遗嘱执行人,我使用org.apache.spark.broadcast.Broadcast
  • 既然我们要保持我使用状态流与mapWithState一些缓存的信息
  • 我也用我的流上下文的检查点

我还需要将广播连接对象传递到mapWithState以从外部源获取一些数据。

当新创建上下文时,流程正常工作。但是,当我崩溃应用程序,并尝试从检查点恢复,我得到一个ClassCastException。

我已经把基于一个example from asyncified.io小的代码片段重现该问题在github

  • 我的广播逻辑是yuvalitzchakov.utils.KafkaWriter.scala
  • 应用程序的虚拟逻辑是yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast.scala

的假人片断E码:

​​

我得到当执行

kafkaWriter.value.someMethodCall()

以下错误:

17/08/01 21:20:38 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 4) 
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to yuvalitzchakov.utils.KafkaWriter 
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserSessions$1(SparkStatefulRunnerWithBroadcast.scala:144) 
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserEvents(SparkStatefulRunnerWithBroadcast.scala:150) 
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:78) 
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:77) 
    at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) 
    at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) 
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) 
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) 
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) 
    at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336) 
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

基本上kafkaWriter是广播变量和kafkaWriter.value应该回到我们广播的变量,但它返回SerializableCongiguration未得到浇铸成所需的对象

预先感谢帮助!

+0

为什么你需要'KafkaWriter'里面'mapWithState'?在更新状态之前是否可以创建呼叫?可能会在'mapPartitions'里面运行的东西呢?顺便说一句,你的例子似乎有一个复制/粘贴错误,因为一些代码被复制两次。 –

+0

感谢您的回复Yuval.This是虚拟的例子,只是为了重现这个问题。在我们真正的用例中,我们必须通过对db用jdbc调用来获取一些数据,我们用它来更新状态。所以我们必须将广播传递给mapWithState。另外,如果您将SparkStateRunner和SparkStateRunnerWithBroadcast称为复制,则前一个没有将广播传递给mapWithState,而后者则具有。 – Saman

+0

我明白了。你有没有考虑在调用'mapWithState'之前调用JDBC驱动程序? –

回答

0

如果我们需要从Spark流中的检查点目录中恢复,则广播变量不能与MapwithState一起使用(一般转换操作)。它只能输出操作中在这种情况下被使用,因为它需要星火背景下懒洋洋地初始化广播

class JavaWordBlacklist { 

private static volatile Broadcast<List<String>> instance = null; 

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) { 
if (instance == null) { 
synchronized (JavaWordBlacklist.class) { 
if (instance == null) 

{ List<String> wordBlacklist = Arrays.asList("a", "b", "c"); instance = jsc.broadcast(wordBlacklist); } 

} 
} 
return instance; 
} 
} 

class JavaDroppedWordsCounter { 

private static volatile LongAccumulator instance = null; 

public static LongAccumulator getInstance(JavaSparkContext jsc) { 
if (instance == null) { 
synchronized (JavaDroppedWordsCounter.class) { 
if (instance == null) 

{ instance = jsc.sc().longAccumulator("WordsInBlacklistCounter"); } 

} 
} 
return instance; 
} 
} 

wordCounts.foreachRDD((rdd, time) -> { 
// Get or register the blacklist Broadcast 
Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); 
// Get or register the droppedWordsCounter Accumulator 
LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); 
// Use blacklist to drop words and use droppedWordsCounter to count them 
String counts = rdd.filter(wordCount -> { 
if (blacklist.value().contains(wordCount._1())) 

{ droppedWordsCounter.add(wordCount._2()); return false; } 

else 

{ return true; } 

}).collect().toString(); 
String output = "Counts at time " + time + " " + counts; 
}