2015-10-29 48 views
0

我是Spark Streaming和Spark的新用户。在我的测试中,我注意到流中的单个错误会导致整个流应用程序失败。为什么在异常情况下Spark会杀死驱动程序进程?

为了更加清楚,让我以一个例子来解释。假设提交的应用程序使用像15, 10, 21, 12, ...这样的整数流。但是,这个流可能会错误地携带一些非整数字符串(例如15, 10, 21, 12, foo, 32, ...)。如果我的代码有错误并假定所有数据项都是整数,那么它将在处理流中的foo字符串时抛出异常。

在这种情况下,Spark引擎默认重试任务3次(您可以检查config documentation参数spark.task.maxFailures)。然后,在所有不成功的尝试之后,它会中止驱动程序进程并因此中止执行程序。

我不确定这是否是正确的行为。相反,我认为当前的任务(即部分数据的执行)或批处理(即从流中读取的一堆数据项)将被丢弃。所以,驱动程序会处理剩余的流。

任何想法为什么Spark的行为如此?是否有任何配置来强制引擎忽略故障并继续?

顺便说一下,我在独立模式下使用Spark。 YARN或Mesos可以为此提供帮助吗?

在此先感谢。

+0

我读它作为一个字符串列和手动处理不正确的数据。 – Reactormonk

回答

2

如果Spark失败了,你会怎么知道出了什么问题? (YARN或Mesos对此没有帮助。)

与Reactormonk建议的一样,您应该指定如何将失败作为传递给Spark的函数的一部分来处理。如果你只是想放弃错误的行和你使用Scala中,你可能想要做像下面这样:

val strRDD = sc.parallelize(Array("15", "10", "21", "12", "foo", "32"),1) 
val intRDD = strRDD.flatMap(x => try{Some(x.toInt)} catch {case e: Exception => None}) 
intRDD.collect() 

这将返回Array[Int] = Array(15, 10, 21, 12, 32)

+0

确定手动处理,但我需要了解Spark是否具有内置的支持,可以默默跳过错误的执行。它可能会通过侦听器API通知我失败的任务,我可以说忽略这个并继续。 – ovunccetin

相关问题