Window
函数应该只做部分技巧。其他部分的技巧可以通过定义udf
功能
def div = udf((age: Double, lag: Double) => lag/age)
首先要做,我们需要用Window
功能找到lag
,然后传递lag
和age
在udf
功能找到div
进口sqlContext.implicits._ 进口org.apache.spark.sql.functions._
val dataframe = Seq(
("A",100),
("A",50),
("A",20),
("A",4)
).toDF("person", "Age")
val windowSpec = Window.partitionBy("person").orderBy(col("Age").desc)
val newDF = dataframe.withColumn("lag", lag(dataframe("Age"), 1) over(windowSpec))
最后CAL的UDF功能
newDF.filter(newDF("lag").isNotNull).withColumn("div", div(newDF("Age"), newDF("lag"))).drop("Age", "lag").show
最后的结果将是
+------+---+
|person|div|
+------+---+
| A|2.0|
| A|2.5|
| A|5.0|
+------+---+
编辑 作为@Jacek已经提出了一个更好的解决方案使用.na.drop
,而不是.filter(newDF("lag").isNotNull)
和使用/
运营商,所以我们甚至不需要调用udf
功能
newDF.na.drop.withColumn("div", newDF("lag")/newDF("Age")).drop("Age", "lag").show
是的。这是窗口聚合函数的“工作”。你用'/'吗? –
@JacekLaskowski,我无法在'functions'中找到'/'或类似的东西。 – summerbulb