2017-06-07 32 views
1

我必须使用RDD这样做的请求:如何使用数据集GROUPBY

val test = Seq(("New York", "Jack"), 
    ("Los Angeles", "Tom"), 
    ("Chicago", "David"), 
    ("Houston", "John"), 
    ("Detroit", "Michael"), 
    ("Chicago", "Andrew"), 
    ("Detroit", "Peter"), 
    ("Detroit", "George") 
) 
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println) 

结果是:

(纽约,列表(杰克))

(底特律,列表(迈克尔·彼得,乔治))

(洛杉矶,列表(汤姆))

(休斯顿,列表(约翰))

(芝加哥,列表(大卫,安德鲁))

如何做到这一点使用数据集spark2.0?

我有办法使用自定义功能,但感觉是如此复杂,有没有简单一点的方法

回答

1

我建议你开始创建一个case class作为

case class Monkey(city: String, firstName: String) 

case class应的主类之外定义。然后,你可以只使用toDS功能和使用groupByaggregation函数调用collect_list如下

import sqlContext.implicits._ 
import org.apache.spark.sql.functions._ 

val test = Seq(("New York", "Jack"), 
    ("Los Angeles", "Tom"), 
    ("Chicago", "David"), 
    ("Houston", "John"), 
    ("Detroit", "Michael"), 
    ("Chicago", "Andrew"), 
    ("Detroit", "Peter"), 
    ("Detroit", "George") 
) 
sc.parallelize(test).map(row => Monkey(row._1, row._2)).toDS().groupBy("city").agg(collect_list("firstName") as "list").show(false) 

您将有输出

+-----------+------------------------+ 
|city  |list     | 
+-----------+------------------------+ 
|Los Angeles|[Tom]     | 
|Detroit |[Michael, Peter, George]| 
|Chicago |[David, Andrew]   | 
|Houston |[John]     | 
|New York |[Jack]     | 
+-----------+------------------------+ 

您可以随时打电话只是.rdd功能

转换回 RDD
0

首先,我会变成你的RDD到DataSet:

val spark: org.apache.spark.sql.SparkSession = ??? 
import spark.implicits._ 

val testDs = test.toDS() 

这里你得到你的col名:)使用它明智!

testDs.schema.fields.foreach(x => println(x)) 

在最后你只需要使用GROUPBY:

testDs.groupBy("City?", "Name?") 

RDD-S是不是真正的2.0版本的方式,我认为。 如果您有任何问题,请随时询问。

+1

'testDs.columns'甚至可以更快地得到没有类型的列名(作为'Array [String]')。 – Garren

+0

好点!真 –

0

要创建数据集,请首先在类别外定义一个案例类作为

case class Employee(city: String, name: String) 

然后您可以将列表转换为数据集作为

val spark = 
    SparkSession.builder().master("local").appName("test").getOrCreate() 
    import spark.implicits._ 
    val test = Seq(("New York", "Jack"), 
    ("Los Angeles", "Tom"), 
    ("Chicago", "David"), 
    ("Houston", "John"), 
    ("Detroit", "Michael"), 
    ("Chicago", "Andrew"), 
    ("Detroit", "Peter"), 
    ("Detroit", "George") 
    ).toDF("city", "name") 
    val data = test.as[Employee] 

或者

import spark.implicits._ 
    val test = Seq(("New York", "Jack"), 
     ("Los Angeles", "Tom"), 
     ("Chicago", "David"), 
     ("Houston", "John"), 
     ("Detroit", "Michael"), 
     ("Chicago", "Andrew"), 
     ("Detroit", "Peter"), 
     ("Detroit", "George") 
    ) 

    val data = test.map(r => Employee(r._1, r._2)).toDS() 

现在,您可以groupby和执行任何聚合为

data.groupBy("city").count().show 

data.groupBy("city").agg(collect_list("name")).show 

希望这有助于!