考虑具有相对大量聚集的和相对较多的组运行GROUP BY
的下面的例子:与火花SQL组围绕性能和内存问题的工作BY
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
val h = new HiveContext(sc)
import h.implicits._
val num_columns = 3e3.toInt
val num_rows = 1e6.toInt
val num_groups = 1e5.toInt
case class Data(A: Long = (math.random*num_groups).toLong)
val table = (1 to num_rows).map(i => Data()).toDF
val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i")
table.registerTempTable("table")
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a")
// Write the result to make sure everyting is executed
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet")
此作业的输入只有8MB,输出大约为2.4GB,我在一台集群上运行,每个集群有三台工作台,每台61GB内存。结果:所有员工都会因OutOfMemory异常而崩溃。 即使的值较低,由于GC开销,作业变得不合理地变慢。
的事情,我们尝试包括:
- 减小分区大小(减少了内存占用,但增加了簿记开销)
- 预先划分做聚合(之前用HashPartitioner数据减少内存消耗但在任何实际工作发生之前需要全面重新洗牌)
是否有更好的方法来达到预期效果?
不是一个答案,但每个工人的61GB堆非常大,并会导致非常长的GC暂停。最好是让更多的小型工作人员或者将每台服务器拆分为运行多个执行程序并减少堆栈以减少GC暂停影响。 – Rich
“减少分区大小”是指调整'spark.sql.shuffle.partitions'?如果没有,你可以尝试增加这个数字,默认值是200. – Rich
@确实,这是我用过的参数。它解决了记忆问题,但我经常发现自己被迫将其增加到不合理的高价值,这又因记账开销而影响绩效。 – DanielM