甲函数应为多个列中的数据帧火花迁移SQL窗函数RDD为更好的性能
def handleBias(df: DataFrame, colName: String, target: String = target) = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
df.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
}
这可以很好地被写为在火花SQL如上所示被执行,并且一个for循环。然而这导致了很多洗牌(spark apply function to columns in parallel)。
小例子:
val df = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
val columnsToDrop = Seq("col3TooMany")
val columnsToCode = Seq("col1", "col2")
val target = "TARGET"
val targetCounts = df.filter(df(target) === 1).groupBy(target)
.agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
(currentDF, colName) => handleBias(currentDF, colName)
}
result.drop(columnsToDrop: _*).show
我怎样才能制定这个更有效使用RDD API? aggregateByKey
应该是一个好主意,但我仍然不清楚如何在这里应用它来替代窗口函数。
(提供多一点上下文/更大的示例https://github.com/geoHeil/sparkContrastCoding)
编辑最初,我开始与Spark dynamic DAG is a lot slower and different from hard coded DAG其如下所示。好的是,每一列似乎都是独立/并行的。缺点是连接(即使是一个300 MB的小数据集)会变得“太大”,并导致无响应的火花。
handleBiasOriginal("col1", df)
.join(handleBiasOriginal("col2", df), df.columns)
.join(handleBiasOriginal("col3TooMany", df), df.columns)
.drop(columnsToDrop: _*).show
def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*")/df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
}
此图片与火花2.1.0,从Spark dynamic DAG is a lot slower and different from hard coded DAG图像是用2.0.2
当施加 df.cache handleBiasOriginal缓存的DAG会有点简单(” col1“,df)。 ...
除了窗口函数,还有什么其他的可能性可以优化SQL吗? 充其量,如果SQL是动态生成的,那就太好了。
请参阅http://stackoverflow.com/questions/41169873/spark-dynamic-dag-is-a-lot-slower-and-different-from-hard-coded-dag以及我的编辑以上。最初,我开始使用group-by和join。这导致一项工作没有在合理时间内完成/晶石似乎没有执行任何操作。虽然联接解决方案适用于小数据,但我无法使其与许多列一起工作。期待如何改进SQL的建议。 –
我不是说连接就是解决方案。我说的是,在大多数情况下,使用aggregateByKey的RDD会比较慢。你可以继续使用我展示的链接和如何实现它的基本逻辑来尝试aggregateByKey。 –
同时,您是否看到一种不使用慢窗口函数的方法,但仍然阻止使用连接? –