假设我有类型,如这些:Dataset.groupByKey +无类型聚合函数
case class SomeType(id: String, x: Int, y: Int, payload: String)
case class Key(x: Int, y: Int)
然后假设我在Dataset[SomeType]
这样做groupByKey
:
val input: Dataset[SomeType] = ...
val grouped: KeyValueGroupedDataset[Key, SomeType] =
input.groupByKey(s => Key(s.x, s.y))
然后假设我有确定功能我想在聚合中使用哪个字段:
val chooseDistinguisher: SomeType => String = _.id
A ND现在我想在分组数据集运行聚合的功能,例如,functions.countDistinct
,使用由所述函数而获得的字段:
grouped.agg(
countDistinct(<something which depends on chooseDistinguisher>).as[Long]
)
的问题是,我无法从chooseDistinguisher
创建一个UDF,因为countDistinct
接受一个Column
,并将一个UDF变成一个Column
您需要指定输入列名,我不能这样做 - 我不知道KeyValueGroupedDataset
的“值”使用哪个名称。
我觉得应该是有可能的,因为KeyValueGroupedDataset
本身有类似的功能:
def count(): Dataset[(K, Long)] = agg(functions.count("*").as(ExpressionEncoder[Long]()))
但是,这种方法欺骗了一下,因为它使用"*"
作为列名,但我需要指定特定列(即键值分组数据集中“值”的列)。另外,当您使用typed
对象中的键入函数时,您也不需要指定列名称,并且它以某种方式工作。
那么,有没有可能做到这一点,如果是这样,如何做到这一点?
听起来有点像一个宏的工作,虽然混合'DataFrames'和静态类型只会给你带来麻烦。另外,根本不要使用SQL,并使用['Aggregator'](https://stackoverflow.com/a/32101530/1560062)。 – zero323
问题是,我想避免重新实现已经存在的所有聚合函数,所以我不想创建自定义聚合器。至于混合数据框和静态类型,好吧,在Spark 2.0中,DataFrame只是'Dataset [Row]'的别名,所以我希望只要提供了正确的类型信息,对数据框起作用的所有东西都应该同样适用于数据集。 –
这就是为什么我想到宏。关于你的期望 - 'DataFrame'是'Dataset'的一个特例,并且大部分'DataFrame'优化,通常不适用于'Dataset'。无框提供了一个更理智的API方法,但仍然远远优化。 – zero323