2015-09-17 25 views
5

我使用Spark流来计算唯一用户。我使用updateStateByKey,所以我需要配置一个检查点目录。我也是从关卡加载数据,同时启动应用程序,如the example in the doc如何配置检查点以重新部署火花流应用程序?

// Function to create and setup a new StreamingContext 
def functionToCreateContext(): StreamingContext = { 
    val ssc = new StreamingContext(...) // new context 
    val lines = ssc.socketTextStream(...) // create DStreams 
    ... 
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory 
    ssc 
} 

// Get StreamingContext from checkpoint data or create a new one 
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 

这里是个问题,如果我的代码更改,然后我重新部署的代码,将在检查站被加载不管多少代码改变了?或者我需要使用自己的逻辑来持久化数据并在下一次运行中加载它们。

如果我使用我自己的逻辑来保存和加载DStream,那么如果应用程序重新启动失败,将不会从检查点目录和我自己的数据库加载数据?

回答

3

检查点本身包含元数据,RDD,天甚至你logic.If你改变你的逻辑,并尝试从最后一个检查点运行它,你很可能打一个例外。 如果你想用你自己的逻辑来的地方作为检查点保存你的数据,你可能需要实现的火花行动,您的检查点数据推到任何数据库,在接下来的运行,加载检查点数据作为初始RDD(万一你正在使用updateStateByKey API)并继续你的逻辑。

2

我在Spark邮件列表中提出了这个问题,并得到了答案,我在my blog上分析过它。我会在这里发表总结:

的方法是同时使用检查点和我们自己的数据加载机制。但是我们将我们的数据加载为updateStateByKeyinitalRDD。因此,在这两种情况下,数据既不会丢失也不会重复:

  1. 当我们改变了代码,并重新部署星火应用程序,我们安全关机老星火应用和清理检查点的数据,所以只能加载数据我们保存的数据。

  2. 当火花应用是失败并重新启动,它将从检查点加载数据。但是DAG的步骤已保存,因此不会再将我们自己的数据加载到initalRDD中。所以唯一加载的数据是检查点数据。

相关问题