2015-05-19 9 views
10

考虑具有相对大量聚集的和相对较多的组运行GROUP BY的下面的例子:与火花SQL组围绕性能和内存问题的工作BY

import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.SparkContext._ 
val h = new HiveContext(sc) 
import h.implicits._ 

val num_columns = 3e3.toInt 
val num_rows = 1e6.toInt 
val num_groups = 1e5.toInt 

case class Data(A: Long = (math.random*num_groups).toLong) 

val table = (1 to num_rows).map(i => Data()).toDF 

val aggregations = (1 to num_columns).map(i => s"count(1) as agg_$i") 
table.registerTempTable("table") 
val result = h.sql(s"select a, ${aggregations.mkString(",")} from table group by a") 

// Write the result to make sure everyting is executed 
result.save(s"result_${num_columns}_${num_rows}_${num_groups}.parquet", "parquet") 

此作业的输入只有8MB,输出大约为2.4GB,我在一台集群上运行,每个集群有三台工作台,每台61GB内存。结果:所有员工都会因OutOfMemory异常而崩溃。 即使​​的值较低,由于GC开销,作业变得不合理地变慢。

的事情,我们尝试包括:

  • 减小分区大小(减少了内存占用,但增加了簿记开销)
  • 预先划分做聚合(之前用HashPartitioner数据减少内存消耗但在任何实际工作发生之前需要全面重新洗牌)

是否有更好的方法来达到预期效果?

+0

不是一个答案,但每个工人的61GB堆非常大,并会导致非常长的GC暂停。最好是让更多的小型工作人员或者将每台服务器拆分为运行多个执行程序并减少堆栈以减少GC暂停影响。 – Rich

+0

“减少分区大小”是指调整'spark.sql.shuffle.partitions'?如果没有,你可以尝试增加这个数字,默认值是200. – Rich

+0

@确实,这是我用过的参数。它解决了记忆问题,但我经常发现自己被迫将其增加到不合理的高价值,这又因记账开销而影响绩效。 – DanielM

回答

2

一般来说,对于像这样的问题几乎通用的解决方案是将分区大小保持在合理的大小。虽然“合理”是稍微主观的,并且可以因个案而异100-200MB看起来是一个很好的起点。

我可以轻松地将单个工作人员提供的示例数据保留为默认值spark.executor.memory(1GB),并将总可用资源限制为8个内核和8GB RAM。所有这些都是通过使用50个分区并保持聚合时间在3秒左右而没有任何特殊的调整(这在1.5.2到2.0.0之间或多或少一致)。

所以总结一下:或者增加spark.default.parallelism或者在创建DataFrame时明确设置分区数量(如果可能的话)。对于像这样的小数据集,默认spark.sql.shuffle.partitions应该足够了。

-1

由于我不确定你正在使用什么样的聚合函数,所以很难说在后台有什么火花。在任何情况下,为了对每个聚合函数进行更多的控制,我会为基本RDD本身上的每一个运行reduceByKey转换。然后,您可以根据需要简单地加入结果。通过这种方式,您可以更好地控制,并且可以查看哪个聚合“成本”最高,而且您可以通过除了混洗之外的操作来避免该组,从而导致内存问题(由于整套数据转换为单个分区)。下面是一个简短的例子,其中aggrigationFunctions是你的聚合函数的列表,它们的ID和实际函数(元组列表)。

val aggrigationResults = aggrigationFunctions.map( 
    f => { 
    val aggRes = baseRdd 
        .map(x => (x.[the field to group by], x.[the value to aggrigate])) 
        .reduceByKey(f.func) 
    (f.id, aggRes) 
    } 
) 
+1

您正在将'groupByKey'(来自_Avoid ..._的那个)与'Dataset'操作混合在一起。这些事情是不一样的。换句话说'Dataset.groupBy'!='RDD.groupBy'。 – zero323