2016-06-11 85 views
12

我有一个火花任务,它接收来自hdfs的8条记录的文件,做一个简单的聚合并将其保存回Hadoop。我注意到当我这样做时,有几百个任务。为什么我的火花任务有这么多任务?

我也不确定为什么有这样的多个工作?我觉得工作更像是什么时候发生的事情。我可以推测为什么 - 但我的理解是,在这个代码中,它应该是一项工作,它应该分解成多个阶段,而不是多个工作。为什么它不把它分解成几个阶段,它怎么分解成工作?

至于200个加任务,因为数据量和节点的量是微乎其微的,它没有任何意义,有像25个任务每行数据时,只有一个聚合和几个过滤器。为什么每个分区每个原子操作只有一个任务?

下面是相关Scala代码 -

import org.apache.spark.sql._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object TestProj {object TestProj { 
    def main(args: Array[String]) { 

    /* set the application name in the SparkConf object */ 
    val appConf = new SparkConf().setAppName("Test Proj") 

    /* env settings that I don't need to set in REPL*/ 
    val sc = new SparkContext(appConf) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt") 

    /*the below rdd will have schema defined in Record class*/ 
    val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt") 
         .map(x=>x.split(" ")) //file record into array of strings based spaces 
         .map(x=>Record(
            x(0).toInt, 
            x(1).asInstanceOf[String], 
            x(2).asInstanceOf[String], 
            x(3).toInt 
            )) 


    /* the below dataframe groups on first letter of first name and counts it*/ 
    val aggDF = rddCase.toDF() 
         .groupBy($"firstName".substr(1,1).alias("firstLetter")) 
         .count 
         .orderBy($"firstLetter") 

    /* save to hdfs*/ 
    aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg") 

    } 

    case class Record(id: Int 
        , firstName: String 
        , lastName: String 
        , quantity:Int) 

} 

下面是截图点击应用后 enter image description here

下面是查看ID 0的特定的“工作”时,阶段表现 enter image description here

以下是屏幕的第一部分,当点击超过200个任务的舞台时

enter image description here

这是舞台enter image description here

下面屏幕里的第二部分是点击“执行者”标签后 enter image description here

按照要求,这里有招聘ID阶段1

enter image description here

这里是第E详细为作业ID 1阶段200个任务

enter image description here

回答

17

这是一个经典的Spark问题。

用于读取的两个任务(第二个图中的阶段ID 0)是defaultMinPartitions设置,设置为2.您可以通过读取REPL sc.defaultMinPartitions中的值来获取此参数。它也应该可以在Spark UI的“环境”下点击。

你可以看看github上的code,看看到底发生了什么。如果您想在读取时使用更多的分区,只需将其添加为参数,例如sc.textFile("a.txt", 20)

现在有趣的部分来自于第二阶段出现的200个分区(第二个图中的阶段Id 1)。那么,每次洗牌时,Spark都需要决定洗牌RDD有多少个分区。你可以想象,默认值是200。如果你有这个配置,你会看到,200个分区不会是有什么比较运行代码

sqlContext.setConf("spark.sql.shuffle.partitions", "4”) 

你可以改变使用。如何设置这个参数是一种艺术。也许选择两倍的核心数量(或其他)。

我认为Spark 2.0有一种方法可以自动推断洗牌RDD的最佳分区数量。期待!

最后,您得到的工作量与产生的优化的Dataframe代码导致的多少RDD操作有关。如果您阅读Spark规范,则说明每个RDD操作都会触发一项工作。当您的操作涉及Dataframe或SparkSQL时,Catalyst优化器将找出执行计划并生成一些基于RDD的代码来执行它。很难说出为什么它在你的情况下使用两个动作。您可能需要查看优化的查询计划,以确切了解正在执行的操作。

+0

感谢的人!我会立即做这件事检查出来。那么多重工作呢?为什么有两份工作? –

+1

你有没有作业ID 1阶段的屏幕? – marios

+0

我将它们添加到OP –

1

我有类似的问题。但在我的场景中,我并行化的集合的元素少于Spark计划的任务数量(导致Spark有时会出现奇怪行为)。使用强制分区号我能够解决这个问题。

它是这样的:

collection = range(10) # In the real scenario it was a complex collection 
sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario 

然后,我在Spark日志看到:

INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks