2017-06-06 63 views
2

在Spark-Sql版本1.6中,使用DataFrame s,是否有一种方法可以针对特定列计算每行的除以当前行和下一个行的分数?如何将当前行的值与下列值相除?

例如,如果我有一列一个表,像这样

Age 
100 
50 
20 
4 

我想下面的输出

Franction 
2 
2.5 
5 

最后一行被丢弃,因为它没有“下一个行“添加到。

现在我正在通过对表格进行排名并将其与自身结合起来,其中rank等于rank+1

有没有更好的方式来做到这一点? 这可以用Window函数完成吗?

+0

是的。这是窗口聚合函数的“工作”。你用'/'吗? –

+0

@JacekLaskowski,我无法在'functions'中找到'/'或类似的东西。 – summerbulb

回答

2

Window函数应该只做部分技巧。其他部分的技巧可以通过定义udf功能

def div = udf((age: Double, lag: Double) => lag/age) 

首先要做,我们需要用Window功能找到lag,然后传递lagageudf功能找到div 进口sqlContext.implicits._ 进口org.apache.spark.sql.functions._

val dataframe = Seq(
    ("A",100), 
    ("A",50), 
    ("A",20), 
    ("A",4) 
).toDF("person", "Age") 

val windowSpec = Window.partitionBy("person").orderBy(col("Age").desc) 
val newDF = dataframe.withColumn("lag", lag(dataframe("Age"), 1) over(windowSpec)) 

最后CAL的UDF功能

newDF.filter(newDF("lag").isNotNull).withColumn("div", div(newDF("Age"), newDF("lag"))).drop("Age", "lag").show 

最后的结果将是

+------+---+ 
|person|div| 
+------+---+ 
|  A|2.0| 
|  A|2.5| 
|  A|5.0| 
+------+---+ 

编辑 作为@Jacek已经提出了一个更好的解决方案使用.na.drop,而不是.filter(newDF("lag").isNotNull)和使用/运营商,所以我们甚至不需要调用udf功能

newDF.na.drop.withColumn("div", newDF("lag")/newDF("Age")).drop("Age", "lag").show 
+2

谢谢。根据你的回答,我编写了以下内容(无UDF):'dataframe.select($“person”,$“Age”/(lead(“Age”,1)over windowSpec)为“div”)。 drop.show' – summerbulb

相关问题