0

我有一个数据框,它具有几个属性(C1到C2),偏移量(以天为单位)和几个值(V1,V2)。Spark Dataframe/Dataset:通用条件累积和

val inputDF= spark.sparkContext.parallelize(Seq((1,2,30, 100, -1),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10).toDF("c1", "c2", "v1", "v2", "offset") 
inputDF: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 3 more fields] 

scala> inputDF.show 
+---+---+---+---+------+ 
| c1| c2| v1| v2|offset| 
+---+---+---+---+------+ 
| 1| 2| 30|100| -1| 
| 1| 2| 30|100|  0| 
| 1| 2| 30|100|  1| 
| 11| 21| 30|100| -1| 
| 11| 21| 30|100|  0| 
| 11| 21| 30|100|  1| 
+---+---+---+---+------+ 

我需要做的是,计算(c1,c2)在整个偏移量上的V1,V2的累积和。

我尝试过,但这远离可以在任何数据框上工作的通用解决方案。

import org.apache.spark.sql.expressions.Window 

val groupKey = List("c1", "c2").map(x => col(x.trim)) 
val orderByKey = List("offset").map(x => col(x.trim)) 

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*) 

val outputDF = inputDF 
    .withColumn("cumulative_v1", sum(inputDF("v1")).over(w)) 
    .withColumn("cumulative_v2", sum(inputDF("v2")).over(w)) 

+---+---+---+---+------+---------------------------- 
| c1| c2| v1| v2|offset|cumulative_v1| cumulative_v2| 
+---+---+---+---+------+-------------|--------------| 
| 1| 2| 30|100| -1|30   | 100   | 
| 1| 2| 30|100|  0|60   | 200   | 
| 1| 2| 30|100|  1|90   | 300   | 
| 11| 21| 30|100| -1|30   | 100   | 
| 11| 21| 30|100|  0|60   | 200   | 
| 11| 21| 30|100|  1|90   | 300   | 
+---+---+---+---+------+----------------------------- 

的挑战是[α]我需要做这种跨多个和不同偏移的窗口(-1至1),(-10〜10),(-30〜30)或任何其它并[b]我需要在多个数据框/数据集中使用此功能,所以我希望能够使用RDD/Dataset中的通用函数。

有关我如何在Spark 2.0中实现这一点的任何想法?

非常感谢帮助。谢谢!

+0

欢迎堆栈溢出!我们是一个问答网站,而不是一个打码人员的服务。请解释你到目前为止尝试过的以及为什么它没有奏效。请参阅:[为什么“有人可以帮助我?”不是一个实际的问题?](http://meta.stackoverflow.com/q/284236) –

+0

谢谢。我用我的解决方案获得了上述结果集。现在添加它。 – Yash

回答

0

下面是使用数据框的原始图片。

import org.apache.spark.sql.expressions.Window 

val groupKey = List("c1", "c2").map(x => col(x.trim)) 
val orderByKey = List("offset").map(x => col(x.trim)) 

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*) 

val inputDF= spark 
    .sparkContext 
    .parallelize(Seq((1,2,30, 100, -1),(1,2,3, 100, -2),(1,2,140, 100, 2),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10) 
    .toDF("c1", "c2", "v1", "v2", "offset") 

val outputDF = inputDF 
    .withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w)) 
    .withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w)) 
    .withColumn("cumulative_v2", sum(inputDF("v2")).over(w)) 

这产生了对于不同窗口的单个“值”的累计总和。

scala> outputDF.show 
+---+---+---+---+------+-------------+-------------+-------------+    
| c1| c2| v1| v2|offset|cumulative_v1|cumulative_v3|cumulative_v2| 
+---+---+---+---+------+-------------+-------------+-------------+ 
| 1| 2| 3|100| -2|   0|   0|   100| 
| 1| 2| 30|100| -1|   30|   30|   200| 
| 1| 2| 30|100|  0|   60|   60|   300| 
| 1| 2| 30|100|  1|   90|   90|   400| 
| 1| 2|140|100|  2|   90|   90|   500| 
| 11| 21| 30|100| -1|   30|   30|   100| 
| 11| 21| 30|100|  0|   60|   60|   200| 
| 11| 21| 30|100|  1|   90|   90|   300| 
+---+---+---+---+------+-------------+-------------+-------------+ 

几个这种方法的缺点 - [1]各条件窗口(-1,1),(-2,2)或任何(from_offset,to_offset),和()需要分开呼叫。 [2]这不是一个通用功能。

我知道火花接受列的聚集函数变量列表这样的 -

val exprs = Map("v1" -> "sum", "v2" -> "sum") 

但我不能确定如何扩展该具有可变条件窗口功能。我仍然很想知道是否有更好的模块化/可重用功能,我们可以写出来解决这个问题。