2016-12-22 54 views
0

在使用Spark RDD时,我知道每次使用ShuffleRDD都会创建一个新阶段,但是当我们有多个动作时会创建一个新阶段?Spark DAG阶段数

例子:

val rdd1 = sc.textFile("<some_path").keyBy(x=>x.split(",")(1)) 

val rdd2 = sc.textFile("<some_path").keyBy(x=>x.split(",")(1)) 

val rdd3 = rdd1.join(rdd2) 

rdd3.filter(x=><somecondition1>).saveAsTextFile("location1") 
rdd3.filter(x=><somecondition2>).saveAsTextFile("location2") 

现在1阶段将涉及到RDD1集,RDD2和rdd3任务,那么2阶段将有两种保存措施?

回答

0

实际上几个月前我问了一个类似的问题here

在你的情况下,rdd3调用一个转换。所以当你声明rdd3时,创建rdd1和rdd2的操作就会发生。随后的转换会在每次保存(特别是过滤)时发生,但rdd1和rdd2不会再次作为操作运行。

如果您在运行保存之前缓存了数据,则会产生类似的效果。

我不知道您使用的是哪个版本的Spark,但是您可以从文档here中找到相关信息。至少1.6+是一样的。

0

Stage2只有一个保存操作。

在你的代码saveAsTextFile是一个动作,它将调用spark来计算你的rdd沿袭。换句话说,spark只会执行这个代码,直到找到saveAsTextFile。然后阶段和任务将被创建并提交给执行者。

由于您的代码有两个saveAsTextFile s,并且您从未缓存过任何中间rdds,所以在这种情况下rdd1,rdd2,rdd3将被计算两次。

舞台是Job内的一个概念,一个动作调用一个工作,所以没有办法在哪个舞台包含两个动作。