您也可以创建自定义聚合函数并使用它在一个窗口函数中。
像这样(写自由所以有可能会出现一些错误):
package com.myuadfs
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class MyUDAF() extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(Array(StructField("Return", DoubleType)))
def bufferSchema = StructType(StructField("compounded", DoubleType))
def dataType: DataType = DoubleType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = 1.0 // set compounded to 1
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = buffer.getDouble(0) * (input.getDouble(0) + 1)
}
// this generally merges two aggregated buffers. This means this
// would not have worked properly had you been working with a regular
// aggregate but since you are planning to use this inside a window
// only this should not be called at all.
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
}
def evaluate(buffer: Row) = {
buffer.getDouble(0)
}
}
现在,你可以在一个窗口函数内使用它。事情是这样的:
import org.apache.spark.sql.Window
val windowSpec = Window.orderBy("date")
val newDF = df.withColumn("compounded", df("Return").over(windowSpec)
注意这整个计算应装在单个分区,所以如果你有太大的数据,你会遇到问题的限制。也就是说,名义上这种操作是在按键分区后执行的(例如,将分区添加到窗口中),然后单个元素应该是键的一部分。
你的样品是OK?在第二行中,Compounded的值不应该是0.98? –
是。它应该是0.98。对不起,错误 –