2016-03-02 82 views
0

我使用hbase-spark在我的spark-streaming项目中记录了pv/uv。然后,当我杀了应用程序,并重新启动它,我有以下异常,而检查点恢复:是否有可能从Spark-streaming检查点恢复广播值

16/03/02 10点17分21秒ERROR HBaseContext:从广播 java.lang.ClassCastException无法getConfig: [B不能转换为com.paitao.xmlife.contrib.hbase.HBaseContext.getConf(HBaseContext.scala:645)org.apache.spark.SerializableWritable com.paitao.xmlife.contrib.hbase.HBaseContext.com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ hbaseForeachPartition(HBaseContext.scala:627) at com.paitao.xmlife.contrib.hbase.HBaseContext $$ anonfun $ com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ bulkMutation $ 1.apply(HBaseContext.scala:457) at com.paitao.xmlife.contrib.hba se.HBaseContext $$ anonfun $ com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ bulkMutation $ 1.apply(HBaseContext.scala:457) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply(RDD.scala:898) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply(RDD.scala:898) at org.apache .spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1839) at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1839) at org.apache.spark .scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor .scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecut or.java:1142) 在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:745)

我检查HBaseContext的代码,它使用广播来存储HBase配置。

class HBaseContext(@transient sc: SparkContext, 
       @transient config: Configuration, 
       val tmpHdfsConfgFile: String = null) extends Serializable with Logging { 

    @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials() 
    @transient var tmpHdfsConfiguration: Configuration = config 
    @transient var appliedCredentials = false 
    @transient val job = Job.getInstance(config) 

    TableMapReduceUtil.initCredentials(job) 
    // <-- broadcast for HBaseConfiguration here !!! 
    var broadcastedConf = sc.broadcast(new SerializableWritable(config)) 
    var credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials())) 
    ... 

当检查点恢复,它试图在其getConf FUNC访问此广播值:

if (tmpHdfsConfiguration == null) { 
    try { 
    tmpHdfsConfiguration = configBroadcast.value.value 
    } catch { 
    case ex: Exception => logError("Unable to getConfig from broadcast", ex) 
    } 
} 

然后发生异常。我的问题是:是否有可能从火花应用程序中的检查点恢复广播值?我们还有其他一些解决方案重播恢复后的值?

感谢您的任何反馈!

回答

1

目前,这是Spark的一个已知的bug。投稿人一直在调查这个问题,但没有取得任何进展。

这是我的解决方法:我不是将数据加载到广播变量并向所有执行者广播,而是让每个执行者将数据本身加载到单例对象中。

顺便说一句,跟着这个问题更改https://issues.apache.org/jira/browse/SPARK-5206

+0

谢谢,它的工作原理,当我初始化singleton对象 –

+0

@伊轩范文及@He白..你能解释一下你的解决方案。如果我在Map函数中如果没有访问Spark上下文,那么executor如何将数据本身加载到单例对象中? – metsathya

相关问题