2017-06-16 41 views
0

假设我有类型,如这些:Dataset.groupByKey +无类型聚合函数

case class SomeType(id: String, x: Int, y: Int, payload: String) 
case class Key(x: Int, y: Int) 

然后假设我在Dataset[SomeType]这样做groupByKey

val input: Dataset[SomeType] = ... 

val grouped: KeyValueGroupedDataset[Key, SomeType] = 
    input.groupByKey(s => Key(s.x, s.y)) 

然后假设我有确定功能我想在聚合中使用哪个字段:

val chooseDistinguisher: SomeType => String = _.id 

A ND现在我想在分组数据集运行聚合的功能,例如,functions.countDistinct,使用由所述函数而获得的字段:

grouped.agg(
    countDistinct(<something which depends on chooseDistinguisher>).as[Long] 
) 

的问题是,我无法从chooseDistinguisher创建一个UDF,因为countDistinct接受一个Column,并将一个UDF变成一个Column您需要指定输入列名,我不能这样做 - 我不知道KeyValueGroupedDataset的“值”使用哪个名称。

我觉得应该是有可能的,因为KeyValueGroupedDataset本身有类似的功能:

def count(): Dataset[(K, Long)] = agg(functions.count("*").as(ExpressionEncoder[Long]())) 

但是,这种方法欺骗了一下,因为它使用"*"作为列名,但我需要指定特定列(即键值分组数据集中“值”的列)。另外,当您使用typed对象中的键入函数时,您也不需要指定列名称,并且它以某种方式工作。

那么,有没有可能做到这一点,如果是这样,如何做到这一点?

+1

听起来有点像一个宏的工作,虽然混合'DataFrames'和静态类型只会给你带来麻烦。另外,根本不要使用SQL,并使用['Aggregator'](https://stackoverflow.com/a/32101530/1560062)。 – zero323

+0

问题是,我想避免重新实现已经存在的所有聚合函数,所以我不想创建自定义聚合器。至于混合数据框和静态类型,好吧,在Spark 2.0中,DataFrame只是'Dataset [Row]'的别名,所以我希望只要提供了正确的类型信息,对数据框起作用的所有东西都应该同样适用于数据集。 –

+0

这就是为什么我想到宏。关于你的期望 - 'DataFrame'是'Dataset'的一个特例,并且大部分'DataFrame'优化,通常不适用于'Dataset'。无框提供了一个更理智的API方法,但仍然远远优化。 – zero323

回答

0

因为我知道这是不可能与agg转型,它预计TypedColumn类型是基于Column类型使用as方法构建的,因此您需要从不是类型安全表达式开始。如果有人知道解决方案,我很想看到它... ...

如果需要使用类型安全的集合,你可以使用以下方法一个:

  • mapGroups - 在这里你可以实现斯卡拉功能负责对于聚集Iterator
  • 实现自定义Aggregator上述

第一种方法需要更少的代码的建议,所以下面我展示简单的例子:

def countDistinct[T](values: Iterator[T])(chooseDistinguisher: T => String): Long = 
    values.map(chooseDistinguisher).toSeq.distinct.size 

ds 
    .groupByKey(s => Key(s.x, s.y)) 
    .mapGroups((k,vs) => (k, countDistinct(vs)(_.name))) 

在我看来,Spark数据集类型安全的API仍然远不及类型安全的DataFrame API成熟。前一段时间,我认为为Spark数据集实现简单易用的类型安全聚合API是个好主意。

0

目前,该用例最好使用DataFrame来处理,您可以稍后将其转换回Dataset[A]

// Code assumes SQLContext implicits are present 
import org.apache.spark.sql.{functions => f} 

val colName = "id" 
ds.toDF 
    .withColumn("key", f.concat('x, f.lit(":"), 'y)) 
    .groupBy('key) 
    .agg(countDistinct(f.col(colName)).as("cntd"))