2017-05-26 38 views
1

我正在学习Spark Streaming。我想保持状态更新,并能够使用mapWithState更新状态。我还在上下文中启用了检查点。如果我不得不停止/重新开始这项工作,我想记住这个国家。现在每次重启都会重新开始计数。我尝试了各种现金,检查点选项,并通过大量发布进行扫描,但没有获得清晰的图像。如何记住Spark Streaming应用程序重新启动之间的状态?

环境: 我在开发中本地运行Spark,也作为HDP沙箱运行。 (我在两种环境中都尝试过)。

  1. 是否有可能记住你杀死Spark作业并重新启动它的状态。 (没有任何编程改变)。

  2. 如果可能怎么办?任何指针或建议都会有所帮助。 (我尝试了chekpoints,缓存在单独的RDD上,MapwithStateRDD在本地和HDP sanbox上)。

  3. 我没有尝试的唯一选择是将MapWithStateRDD保存到磁盘并将其读回为initialRDD。无论如何,不​​会觉得这是正确的选择。

我只发现了一个没有答案的类似问题。 Spark Checkpoint doesn't remember state (Java HDFS)

谢谢。

代码:

def getStreamingContext(streamingApp : (SparkContext, Duration) => StreamingContext, sc : SparkContext, batchDuration: Duration) = { 
    val creatingFunc =() => streamingApp(sc, batchDuration) 
    val ssc = sc.getCheckpointDir match { 
     case Some(checkpointDir) => 
     println("Get or Create Context") 
     StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc, sc.hadoopConfiguration, createOnError = true) 
     case None => 
     print("New Context") 
     StreamingContext.getActiveOrCreate(creatingFunc) 
    } 
    sc.getCheckpointDir.foreach(cp => ssc.checkpoint(cp)) 
    println(ssc.getState()) 
    ssc 
    } 

星火版本2.1.0

+0

使用点校验,但你说你启用它,所以...你可以显示代码初始化StreamingContext并设置检查点?什么是Spark版本? –

+0

谢谢你的快速回复Jacek。 – user2022329

+0

btw:我还尝试了一个课程材料的练习,它具有完整的检查点代码,并且表现相同。我想我可能不得不将状态保存到HDF并在重新启动时初始化。 – user2022329

回答

1

我得到它的工作...感谢以下Q/A。 [链接](Spark streaming not remembering previous state

我是缺少以下行updateStateByKey后

statefulActivity.checkpoint(分钟(1))

与启用检查点目录一起添加的持续时间记住在重新启动状态。

相关问题