我使用Spark流来计算唯一用户。我使用updateStateByKey
,所以我需要配置一个检查点目录。我也是从关卡加载数据,同时启动应用程序,如the example in the doc:如何配置检查点以重新部署火花流应用程序?
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
这里是个问题,如果我的代码更改,然后我重新部署的代码,将在检查站被加载不管多少代码改变了?或者我需要使用自己的逻辑来持久化数据并在下一次运行中加载它们。
如果我使用我自己的逻辑来保存和加载DStream,那么如果应用程序重新启动失败,将不会从检查点目录和我自己的数据库加载数据?