2017-01-03 25 views
4

甲函数应为多个列中的数据帧火花迁移SQL窗函数RDD为更好的性能

def handleBias(df: DataFrame, colName: String, target: String = target) = { 
    val w1 = Window.partitionBy(colName) 
    val w2 = Window.partitionBy(colName, target) 

    df.withColumn("cnt_group", count("*").over(w2)) 
     .withColumn("pre2_" + colName, mean(target).over(w1)) 
     .withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D))) 
     .drop("cnt_group") 
    } 

这可以很好地被写为在火花SQL如上所示被执行,并且一个for循环。然而这导致了很多洗牌(spark apply function to columns in parallel)。

小例子:

val df = Seq(
    (0, "A", "B", "C", "D"), 
    (1, "A", "B", "C", "D"), 
    (0, "d", "a", "jkl", "d"), 
    (0, "d", "g", "C", "D"), 
    (1, "A", "d", "t", "k"), 
    (1, "d", "c", "C", "D"), 
    (1, "c", "B", "C", "D") 
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 

    val columnsToDrop = Seq("col3TooMany") 
    val columnsToCode = Seq("col1", "col2") 
    val target = "TARGET" 

    val targetCounts = df.filter(df(target) === 1).groupBy(target) 
    .agg(count(target).as("cnt_foo_eq_1")) 
    val newDF = df.join(broadcast(targetCounts), Seq(target), "left") 

    val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) { 
    (currentDF, colName) => handleBias(currentDF, colName) 
    } 

    result.drop(columnsToDrop: _*).show 

我怎样才能制定这个更有效使用RDD API? aggregateByKey应该是一个好主意,但我仍然不清楚如何在这里应用它来替代窗口函数。

(提供多一点上下文/更大的示例https://github.com/geoHeil/sparkContrastCoding

编辑

最初,我开始与Spark dynamic DAG is a lot slower and different from hard coded DAG其如下所示。好的是,每一列似乎都是独立/并行的。缺点是连接(即使是一个300 MB的小数据集)会变得“太大”,并导致无响应的火花。

handleBiasOriginal("col1", df) 
    .join(handleBiasOriginal("col2", df), df.columns) 
    .join(handleBiasOriginal("col3TooMany", df), df.columns) 
    .drop(columnsToDrop: _*).show 

    def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = { 
    val pre1_1 = df 
     .filter(df(target) === 1) 
     .groupBy(col, target) 
     .agg((count("*")/df.filter(df(target) === 1).count).alias("pre_" + col)) 
     .drop(target) 

    val pre2_1 = df 
     .groupBy(col) 
     .agg(mean(target).alias("pre2_" + col)) 

    df 
     .join(pre1_1, Seq(col), "left") 
     .join(pre2_1, Seq(col), "left") 
     .na.fill(0) 
    } 

此图片与火花2.1.0,从Spark dynamic DAG is a lot slower and different from hard coded DAG图像是用2.0.2 toocomplexDAG

当施加 df.cache handleBiasOriginal缓存的DAG会有点简单(” col1“,df)。 ...

除了窗口函数,还有什么其他的可能性可以优化SQL吗? 充其量,如果SQL是动态生成的,那就太好了。

caching

回答

1

这里的要点是避免不必要的洗牌。现在,您的代码将为您要包含的每个列进行两次洗牌,并且结果数据布局不能在列之间重复使用。

为简单起见,我假设target始终是二进制({0,1}),并且您使用的所有其余列都是StringType。此外,我假设列的基数足够低,以便将结果分组并在本地处理。您可以调整这些方法来处理其他情况,但它需要更多的工作。

RDD API

  • 从广角到长

    重塑数据:

    import org.apache.spark.sql.functions._ 
    
    val exploded = explode(array(
        (columnsToDrop ++ columnsToCode).map(c => 
        struct(lit(c).alias("k"), col(c).alias("v"))): _* 
    )).alias("level") 
    
    val long = df.select(exploded, $"TARGET") 
    
  • aggregateByKey,重塑和收集:

    import org.apache.spark.util.StatCounter 
    
    val lookup = long.as[((String, String), Int)].rdd 
        // You can use prefix partitioner (one that depends only on _._1) 
        // to avoid reshuffling for groupByKey 
        .aggregateByKey(StatCounter())(_ merge _, _ merge _) 
        .map { case ((c, v), s) => (c, (v, s)) } 
        .groupByKey 
        .mapValues(_.toMap) 
        .collectAsMap 
    
  • 您可以使用lookupŧ o获得单个列和级别的统计信息。例如:

    lookup("col1")("A") 
    
    org.apache.spark.util.StatCounter = 
        (count: 3, mean: 0.666667, stdev: 0.471405, max: 1.000000, min: 0.000000) 
    

    为您提供col1数据,A水平。基于二进制TARGET的假设,这个信息是完整的(你得到两个类的计数/分数)。

    您可以使用查找这样生成SQL表达式或将它传递给udf并将其应用在各个列。

数据帧API

  • 转换数据,只要对RDD API。基于层次
  • 计算聚集:

    val stats = long 
        .groupBy($"level.k", $"level.v") 
        .agg(mean($"TARGET"), sum($"TARGET")) 
    
  • 根据自己的喜好,你可以重塑这个能够高效率地加入或转换到本地收集和类似于RDD解决方案。

0

使用aggregateByKey 上aggregateByKey一个简单的解释可以发现here。基本上你使用两个函数:一个在分区内工作,另一个在分区之间工作。

您需要在第一列执行类似聚合的操作,并在内部使用映射为第二列的每个元素构建一个数据结构,以在其中汇总和收集数据(当然,如果需要,可以执行两个aggregateByKey) 。 这不会解决你想要使用的每一列的代码上执行多次运行的情况(你可以使用聚合而不是aggregateByKey来处理所有数据并将其放入地图中,但这可能会使你甚至性能较差)。结果将是每个键一行,如果你想回到原始记录(如窗口函数那样),你实际上需要将该值与原始的RDD结合或者在内部保存所有值并且平坦地图

我不相信这会为你提供任何真正的性能改进。你会做很多工作来重新实现在SQL中为你完成的任务,同时这样做会失去SQL的大部分优点(催化剂优化,钨内存管理,全部代码生成等等)。)

改进SQL

我会做的反而是试图改善SQL本身。 例如,窗口函数中列的结果对于所有值看起来都是相同的。你真的需要一个窗口功能吗?你可以做一个groupBy而不是一个窗口函数(如果你真的需要这个记录,你可以尝试加入结果。这可能会提供更好的性能,因为它不一定意味着在每一步中都洗牌两次)。

+0

请参阅http://stackoverflow.com/questions/41169873/spark-dynamic-dag-is-a-lot-slower-and-different-from-hard-coded-dag以及我的编辑以上。最初,我开始使用group-by和join。这导致一项工作没有在合理时间内完成/晶石似乎没有执行任何操作。虽然联接解决方案适用于小数据,但我无法使其与许多列一起工作。期待如何改进SQL的建议。 –

+0

我不是说连接就是解决方案。我说的是,在大多数情况下,使用aggregateByKey的RDD会比较慢。你可以继续使用我展示的链接和如何实现它的基本逻辑来尝试aggregateByKey。 –

+0

同时,您是否看到一种不使用慢窗口函数的方法,但仍然阻止使用连接? –