我正在学习Spark Streaming。我想保持状态更新,并能够使用mapWithState更新状态。我还在上下文中启用了检查点。如果我不得不停止/重新开始这项工作,我想记住这个国家。现在每次重启都会重新开始计数。我尝试了各种现金,检查点选项,并通过大量发布进行扫描,但没有获得清晰的图像。如何记住Spark Streaming应用程序重新启动之间的状态?
环境: 我在开发中本地运行Spark,也作为HDP沙箱运行。 (我在两种环境中都尝试过)。
是否有可能记住你杀死Spark作业并重新启动它的状态。 (没有任何编程改变)。
如果可能怎么办?任何指针或建议都会有所帮助。 (我尝试了chekpoints,缓存在单独的RDD上,MapwithStateRDD在本地和HDP sanbox上)。
我没有尝试的唯一选择是将MapWithStateRDD保存到磁盘并将其读回为initialRDD。无论如何,不会觉得这是正确的选择。
我只发现了一个没有答案的类似问题。 Spark Checkpoint doesn't remember state (Java HDFS)
谢谢。
代码:
def getStreamingContext(streamingApp : (SparkContext, Duration) => StreamingContext, sc : SparkContext, batchDuration: Duration) = {
val creatingFunc =() => streamingApp(sc, batchDuration)
val ssc = sc.getCheckpointDir match {
case Some(checkpointDir) =>
println("Get or Create Context")
StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc, sc.hadoopConfiguration, createOnError = true)
case None =>
print("New Context")
StreamingContext.getActiveOrCreate(creatingFunc)
}
sc.getCheckpointDir.foreach(cp => ssc.checkpoint(cp))
println(ssc.getState())
ssc
}
星火版本2.1.0
使用点校验,但你说你启用它,所以...你可以显示代码初始化StreamingContext并设置检查点?什么是Spark版本? –
谢谢你的快速回复Jacek。 – user2022329
btw:我还尝试了一个课程材料的练习,它具有完整的检查点代码,并且表现相同。我想我可能不得不将状态保存到HDF并在重新启动时初始化。 – user2022329