2015-05-21 109 views
6

我正在使用Spark Scala API。我有一个Spark SQL数据帧(从Avro的文件中读取)与下面的模式:如何在flatMap中使用Spark SQL DataFrame?

root 
|-- ids: array (nullable = true) 
| |-- element: map (containsNull = true) 
| | |-- key: integer 
| | |-- value: string (valueContainsNull = true) 
|-- match: array (nullable = true) 
| |-- element: integer (containsNull = true) 

本质上2列[IDS:列表[图[诠释,字符串]],匹配:列表[INT]]。

[List(Map(1 -> a), Map(2 -> b), Map(3 -> c), Map(4 -> d)),List(0, 0, 1, 0)] 
[List(Map(5 -> c), Map(6 -> a), Map(7 -> e), Map(8 -> d)),List(1, 0, 1, 0)] 
... 

我想这样做是flatMap()每一行产生3列[ID财产比赛]:看起来像的样本数据。使用上面两行作为输入数据,我们将得到:

[1,a,0] 
[2,b,0] 
[3,c,1] 
[4,d,0] 
[5,c,1] 
[6,a,0] 
[7,e,1] 
[8,d,0] 
... 

,然后groupByString财产(例如:A,B,...)产生count("property")sum("match")

a 2 0 
b 1 0 
c 2 2 
d 2 0 
e 1 1 

我希望做这样的事情:

val result = myDataFrame.select("ids","match").flatMap( 
    (row: Row) => row.getList[Map[Int,String]](1).toArray()) 
result.groupBy("property").agg(Map(
    "property" -> "count", 
    "match" -> "sum")) 

的问题是flatMap将DataFrame转换为RDD。是否有一种好方法可以使用DataFrames执行flatMap类型的操作,然后使用groupBy

回答

8

flatMap做什么你想要的?它将每个输入行转换为0或更多行。它可以过滤出来,或者可以添加新的。在SQL中,您可以使用join获得相同的功能。你能用join做你想做的事吗?

或者,您也可以看看Dataframe.explode,这仅仅是一个特定种类的join(你可以很容易地手艺自己explode通过加入一个数据帧到UDF)。 explode将单个列作为输入,并让您将其拆分或将其转换为多个值,然后将原始行重新转换回新行。所以:

user  groups 
griffin mkt,it,admin 

将变成:

user  group 
griffin mkt 
griffin it 
griffin admin 

所以,我要说看看DataFrame.explode如果不让你有轻松,尝试用UDF的连接。

+0

谢谢你的回答! DataFrame.explode方法正是我所需要的。 –

0

我的SQL有点生疏,但您的flatMap中有一个选项可以生成Row对象列表,然后您可以将生成的RDD转换回DataFrame。

相关问题