2016-06-23 51 views
2

我希望能够为每个任务生成度量标准的表,例如在访问某个特定阶段时收集器Spark UI上的标准。查找计划程序Spark的延迟

其中一列是Scheduler delay,在Spark提供的任何REST API中找不到。 (当我浏览/ api/v1/applications/[app-id]/stages/[stage-id]/[attempt]/taskList)时,会出现所有其他列(当我浏览/ api/v1/applications/[app-id]/stages/[stage-id]/[尝试]/taskList时)。

调度程序延迟如何计算/是否有一种方法可以在不刮取收集器Spark UI网页的情况下抽出数据?

回答

2

调度程序延迟没有在历史api中提供,是的。

private[ui] def getSchedulerDelay(info: TaskInfo, metrics: TaskMetricsUIData, currentTime: Long): Long = { 
    if (info.finished) { 
     val totalExecutionTime = info.finishTime - info.launchTime 
     val executorOverhead = (metrics.executorDeserializeTime + metrics.resultSerializationTime) 
     math.max(0,totalExecutionTime - metrics.executorRunTime - executorOverhead - getGettingResultTime(info, currentTime)) 
    } else { 
     // The task is still running and the metrics like executorRunTime are not available. 
     0L 
    } 
} 

看到https://github.com/apache/spark/blob/branch-2.0/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala行号770

0

至少火花1.6,如果你正在寻找调度延迟火花流批,你可以看看spark streaming UI source code:对于UI,如下它的计算方法。

它使用一类BatchUIData,其中scheduling delay定义:

/** 
* Time taken for the first job of this batch to start processing from the time this batch 
* was submitted to the streaming scheduler. Essentially, it is 
* `processingStartTime` - `submissionTime`. 
*/ 
def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime)