2017-05-29 41 views
0

我们正在尝试构建容错火花流作业,但我们遇到了一个问题。下面是我们的情景:Spark Streaming - 从检查点重新开始重播上一批

1) Start a spark streaming process that runs batches of 2 mins 
    2) We have checkpoint enabled. Also the streaming context is configured to either create a new context or build from checkpoint if one exists 
    3) After a particular batch completes, the spark streaming job is manually killed using yarn application -kill (basically mimicking a sudden failure) 
    4) The spark streaming job is then restarted from checkpoint 

,我们遇到的问题是,火花流作业重新启动后,重播最后成功一批。它总是这样做,只是最后一个成功的批次被重播,而不是早期批次

这样做的副作用是该批次的数据部分是重复的。我们甚至试图在最后一次成功的批处理之后等待超过一分钟,然后再杀死进程(以防写入检查点需要时间),但这并没有帮助

有什么见解?我没有在这里添加代码,希望有人也面对这个问题,并可以提供一些想法或见解。如果有帮助,也可以发布相关代码。不应该在成功的批处理后立即触发流式检查点,以便在重新启动后不重播?我放置ssc.checkpoint命令的位置有关系吗?

回答

2

你在问题的最后一行有答案。 ssc.checkpoint()的位置很重要。当您使用保存的检查点重新启动作业时,作业会随着所保存的内容一起提供。因此,在你的情况下,当你在批次完成后杀死了工作,最近的一个是最后一个成功的。此时,您可能已经了解,检查点主要是从您离开的地方取回 - 尤其是失败的工作。

+0

谢谢Ramzy。“到这个时候,你可能有understo检查点主要是从你离开的地方取回 - 尤其是失败的工作“。是的,这是有道理的。这是我没有得到的,当我杀死火花作业时,最后一个状态是一个成功的批处理。为什么该批次在重新启动后重播?这是一个限制或什么需要改变我的检查点设置的方式? – Shay

+0

理想情况下,预计流式传输将持续运行。因此,如果在作业之间发生某些事情,检查点应确保它在停止的地方拾起。关于重复 - 这可以由检查点()方法的位置以及如何处理下游应用程序中的重复项决定。最后,你将失败或重新整批地丢失一些数据。但是因为我们正在谈论流媒体,所以这些情况很少见。您的情况非常棘手,因为您正在使用流式传输来停止和频繁启动 – Ramzy

0

有两件事需要注意。

1]确保在重新启动程序时,在getOrCreate流式上下文方法中使用相同的检查点目录。

2]将“spark.streaming.stopGracefullyOnShutdown”为‘true’。这让火花来完成处理当前的数据和更新检查点目录因此,如果设置为false,它可能会导致在检查点目录中出现损坏的数据。

注意:请张贴代码片段,如果可能的是,ssc.checkpoint的位置确实重要

0

在这种情况下,应确保在流上下文方法使用该检查点目录星火重启后是相同的应用程序。希望它会帮助

相关问题