2016-08-27 126 views
3

我有一个数据帧,并且我想按列进行分组,并使用相同的模式将这些组转换回数据帧。原因是我想要在整个组中映射一个带有签名DataFrame -> String的函数。下面是我想:如何从分组数据中获取火花数据帧

val df = sc.parallelize(Seq((1,2,3),(1,2,4),(2,3,4))).toDF 
val schema = df.schema 
val groups = df.rdd.groupBy(x => x(0)) 
       .mapValues(g => sqlContext.createDataFrame(sc.makeRDD(g.toList), schema)) 
       .take(1) 

这里就是我希望的:

scala> groups(0)._2.collect 
Array[org.apache.spark.sql.Row] = Array([1,2,3], [1,2,4])  

,但它不工作(任务与NullPointerException失败的)......我想你不能地图一个引用火花上下文的函数,但我不知道如何实现这个功能?

+1

能你提供一个例子。 您期望输入和输出什么? – giaosudau

回答

1

我猜你不能映射是指火花背景

正确的功能 - 你不能传递到任何一个函数内使用任何火花的上下文对象(或RDDS,或Dataframes)的Spark的高级函数,因为这需要将这些对象序列化并将它们发送给执行程序,但它们故意不可序列化,因为它没有任何意义(每个执行程序必须像另一个驱动程序应用程序一样行事) 。

为了实现只包含一个“组”一个数据帧,我推荐使用的filter代替groupBy:你可以先collect所有组密钥,然后每一个映射到数据帧过滤:

val df = sc.parallelize(Seq((1,2,3),(1,2,4),(2,3,4))).toDF 

df.cache() // EDIT: this might speed this up significantly, as DF will be reused instead of recalculated for each key 

val groupKeys: Array[Int] = df.map { case Row(i: Int, _, _) => i }.distinct().collect() 
val dfPerKey: Array[DataFrame] = groupKeys.map(k => df.filter($"_1" === k)) 

dfPerKey.foreach(_.show()) 
// prints: 
// +---+---+---+ 
// | _1| _2| _3| 
// +---+---+---+ 
// | 1| 2| 3| 
// | 1| 2| 4| 
// +---+---+---+ 
// 
// +---+---+---+ 
// | _1| _2| _3| 
// +---+---+---+ 
// | 2| 3| 4| 
// +---+---+---+ 
+0

嗨Tzach,感谢羚牛的时间来回答。这是一个聪明的方法,不幸的是它很慢,我认为它为每个过滤器做了一个完整的洗牌(对于我的大数据框,它为每个组运行一个单独的阶段!任何想法如何更接近'groupBy'的性能? – maxymoo

+0

如果您在收集密钥并迭代它们之前添加'df.cache()',可能会有所帮助 –

+1

感谢Tzach,修复它!感谢您的耐心,我仍然非常新的火花! – maxymoo