2015-07-19 84 views
12

我想了解如何使Spark Streaming应用程序更容错(特别是在尝试写入下游依赖项时),并且我不知道最佳方法是处理尝试将结果写入外部源(如Cassandra,DynamoDB等)的失败。Apache Spark Streaming,如何处理下游依赖失败

例如,我有一个Spark Streaming作业,用于从Stream(Kafka,Flume等)中提取数据...我没有'最终确定使用哪种技术),将相似的项目汇总在一起,然后将结果写入外部商店。 (即Cassandra,DynamoDB或任何正在接收DStream计算的结果)。

我想弄清楚如何处理外部依赖不可写入的情况。也许集群发生故障,可能存在许可问题等,但我的工作无法将结果写入外部依赖关系。是否有办法暂停Spark Streaming,以便接收器不会继续批量处理数据?我是否应该睡眠当前批次并让Receiver继续存储批次?如果问题是暂时的(几秒钟),继续批处理可能是可以接受的,但是如果依赖关系下降几分钟或1+小时以上会发生什么?

我想过的一个观点是有一个监视进程,在后台监视依赖关系的健康状况,如果发现它是“不健康的”,它会停止工作。然后,当所有依赖关系都健康时,我可以开始备份作业并处理所有未写入外部源的数据。

我的另一个想法是以某种方式在DStream forEachRdd方法中发出信号,表示存在问题。是否有某种例外,我可以扔在DStream中,它会发回给驱动程序它应该停止?

如果任何人有任何关于如何处理External Fault Tolerance的经验,或者可以指向我的好文章/视频,那就太好了。

感谢

+2

我不确定,但下游数据接收器/存储器本身应该处理故障吗?担心这一点超出了Spark的责任。此外,如果发生故障,则更多的是监控和警告,以便工程师可以得到通知并立即检查故障。 – keypoint

回答

1

我相信这里没有简单和普遍的答案。很大程度上取决于应用程序语义,数据源的类型(可靠的接收器,可靠的接收器,基于文件,无接收器)和要求。

一般而言,您绝对不应该让应用程序故障处理单个IO故障。假设你有一些行动:

outputAction[T](rdd: RDD[T]): Unit = ??? 

至少要确保它不会传播异常到您的驱动程序。

outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ??? 

stream foreachRDD { rdd => Try(outputAction(rdd)) } 

问题依然存在。你可以做的最简单的事情就是放弃给定的窗口。取决于应用程序,它可以是可接受的解决方案或不是,但通常有很多情况下,丢失一些数据是完全可以接受的。

通过记录故障并采取一些其他措施(如果达到某个阈值)可以进一步改进。

如果丢弃数据是不能接受的下一个步骤是一些延迟之后重试:

outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ??? 

stream foreachRDD { 
    rdd => Try(outputAction(rdd)) 
    .recoverWith { case _ => Try(outputActionWithDelay(d1)(rdd)) } 
    .recoverWith { case _ => Try(outputActionWithDelay(d2)(rdd)) } 
    ... 
} 

重试的次数和延迟持续时间将有所不同情况而和depnds源和存储输入数据的能力上。

你可以做什么,当我们打到最后重试?对于初学者,我们可以添加一个替代输出源。例如,您可以将所有内容都推送到可靠的外部文件存储,并在以后担心。如果输出源需要输入数据的特定顺序,则这可能不适用,否则应值得尝试。

alternativeOutputAction[T](rdd: RDD[T]) = ??? 

stream foreachRDD { 
    rdd => Try(outputAction(rdd)) 
    .recoverWith { case _ => Try(outputActionWithDelay(d1) 
    ... 
    .recoverWith { case _ => Try(outputActionWithDelay(dn)(rdd)) } 
    .recoverWith { case _ => Try(alternativeOutputAction(rdd)) 
} 

如果失败,它可能是严重的问题的症状,并没有太多我们可以在应用层做。我们可以回到第一种方法,只是希望情况很快得到解决或选择更复杂的方法。

如果输入源可以缓冲数据并且我们使用可靠的存储和复制那么我们可以enable checkpointing和简单地杀死该应用程序。

如果尝试恢复,添加CircuitBreaker的某些变体可能是一个不错的主意,如果应用程序遇到多个失败尝试无延迟地尝试到达主输出丢失恢复尝试。

1

现在我用直接流和自行保存偏移。这可能无法解决您的问题,至少一旦您发现外部存储的某些问题,您可以从停止的位置重新启动。