我有一个火花任务,它接收来自hdfs的8条记录的文件,做一个简单的聚合并将其保存回Hadoop。我注意到当我这样做时,有几百个任务。为什么我的火花任务有这么多任务?
我也不确定为什么有这样的多个工作?我觉得工作更像是什么时候发生的事情。我可以推测为什么 - 但我的理解是,在这个代码中,它应该是一项工作,它应该分解成多个阶段,而不是多个工作。为什么它不把它分解成几个阶段,它怎么分解成工作?
至于200个加任务,因为数据量和节点的量是微乎其微的,它没有任何意义,有像25个任务每行数据时,只有一个聚合和几个过滤器。为什么每个分区每个原子操作只有一个任务?
下面是相关Scala代码 -
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object TestProj {object TestProj {
def main(args: Array[String]) {
/* set the application name in the SparkConf object */
val appConf = new SparkConf().setAppName("Test Proj")
/* env settings that I don't need to set in REPL*/
val sc = new SparkContext(appConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
/*the below rdd will have schema defined in Record class*/
val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
.map(x=>x.split(" ")) //file record into array of strings based spaces
.map(x=>Record(
x(0).toInt,
x(1).asInstanceOf[String],
x(2).asInstanceOf[String],
x(3).toInt
))
/* the below dataframe groups on first letter of first name and counts it*/
val aggDF = rddCase.toDF()
.groupBy($"firstName".substr(1,1).alias("firstLetter"))
.count
.orderBy($"firstLetter")
/* save to hdfs*/
aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")
}
case class Record(id: Int
, firstName: String
, lastName: String
, quantity:Int)
}
以下是屏幕的第一部分,当点击超过200个任务的舞台时
按照要求,这里有招聘ID阶段1
这里是第E详细为作业ID 1阶段200个任务
感谢的人!我会立即做这件事检查出来。那么多重工作呢?为什么有两份工作? –
你有没有作业ID 1阶段的屏幕? – marios
我将它们添加到OP –