2016-12-27 102 views
2

我想停止Spark重试Spark应用程序以防发生某些特定异常。如果满足某些条件,我只想限制重试次数。否则,我需要默认的重试次数。如何控制每个任务/舞台/作业尝试的Spark应用程序?

请注意,Spark应用程序只运行一个Spark作业。

我试着在例外的情况下设置javaSparkContext.setLocalProperty("spark.yarn.maxAppAttempts", "1");,但仍然重试整个工作。

我提交星火应用程序如下:

spark-submit --deploy-mode cluster theSparkApp.jar 

我有,我想删除,如果它是由同一作业的先前重试创建的输出的使用情况,但未能如果工作输出文件夹不是空的(在第1次重试中)。你能想出任何其他方式来实现这一目标吗?

+0

你如何提交部署你的星火应用程序?什么是使用的命令行选项和Spark属性?顺便说一句,即使你说你的意思是“整个Spark应用程序”,你仍然使用“整个工作仍在重试”。一个Spark应用程序可以运行/提交一个或多个Spark作业。 –

+0

您可否使用'spark-submit --deploy-mode cluster --conf spark.yarn.maxAppAttempts = 1'(并在命令行上使用Spark设置)。 –

回答

1

我有一个用例,如果输出文件夹不是空的(在第一次重试时),那么我想要删除输出,如果它是由同一作业的上一次重试创建的,但失败。你能想出任何其他方式来实现这一目标吗?

您可以使用TaskContext来控制你的星火工作的行为方式给出,比如说,重试次数如下:

val rdd = sc.parallelize(0 to 8, numSlices = 1) 

import org.apache.spark.TaskContext 

def businessCondition(ctx: TaskContext): Boolean = { 
    ctx.attemptNumber == 0 
} 

val mapped = rdd.map { n => 
    val ctx = TaskContext.get 
    if (businessCondition(ctx)) { 
    println("Failing the task because business condition is met") 
    throw new IllegalArgumentException("attemptNumber == 0") 
    } 
    println(s"It's ok to proceed -- business condition is NOT met") 
    n 
} 
mapped.count 
+0

这里的问题是,我不知道我的工作是因为businessCondition()还是由于其他原因(除非我在Spark之外的某个地方保持此状态,因此我想避免)而未能通过第一次重试。因此,我能想到的唯一可能的方式是在满足businessCondition()的情况下强制Spark失败,而不进行任何重试。 – user401445

+0

addTaskCompletionListener(listener:TaskCompletionListener):TaskContext'和'addTaskFailureListener(listener:TaskFailureListener):TaskContext'?以前从未使用过它们,但它们看起来好像在这里可能会有所帮助。 –

+0

'onApplicationEnd'可以工作,但是无法在SparkListener中获取TaskContext。我需要TaskContext来确定究竟要删除什么。另外,我不确定是否可以在侦听器界面中查找应用程序是成功还是失败。 – user401445

相关问题