2015-07-28 83 views
11

使用Scala,我怎样才能将dataFrame分成多个dataFrame(无论是数组还是集合),并且具有相同的列值。 例如我要拆分的以下数据帧:如何将数据帧拆分为具有相同列值的数据框?

ID Rate State 
1 24 AL 
2 35 MN 
3 46 FL 
4 34 AL 
5 78 MN 
6 99 FL 

到:

数据集1

ID Rate State 
1 24 AL 
4 34 AL 

数据组2

ID Rate State 
2 35 MN 
5 78 MN 

数据组3

ID Rate State 
3 46 FL 
6 99 FL 
+1

为什么你需要在多个数据帧中拆分数据帧? (AL,Seq(24 AL,4 34 AL)),(MN,Seq(35 MN,5 78 MN)),(FL,Seq(46 FL 6 99 FL))]使用groupBy。 –

+0

groupBy给出GroupDate类型,我该如何将它转换为Array? – user1735076

+0

你能解释一下你想用这个数组做什么? – lev

回答

11

你可以收集独有的状态值,并简单地映射在结果数组:

val states = df.select("State").distinct.collect.flatMap(_.toSeq) 
val byStateArray = states.map(state => df.where($"State" <=> state)) 

或来图:

val byStateMap = states 
    .map(state => (state -> df.where($"State" <=> state))) 
    .toMap 

同样的事情在Python:

from itertools import chain 
from pyspark.sql.functions import col 

states = chain(*df.select("state").distinct().collect()) 

# PySpark 2.3 and later 
# In 2.2 and before col("state") == state) 
# should give the same outcome, ignoring NULLs 
# if NULLs are important 
# (lit(state).isNull() & col("state").isNull()) | (col("state") == state) 
df_by_state = {state: 
    df.where(col("state").eqNullSafe(state)) for state in states} 

明显问题在于它需要对每个级别进行全面的数据扫描,所以这是一项昂贵的操作。如果你正在寻找一种方式,只是把标准输出也看到How do I split an RDD into two or more RDDs?

特别是你可以写Dataset所关心列分区:

val path: String = ??? 
df.write.partitionBy("State").parquet(path) 

,如果需要回读:

// Depend on partition prunning 
for { state <- states } yield spark.read.parquet(path).where($"State" === state) 

// or explicitly read the partition 
for { state <- states } yield spark.read.parquet(s"$path/State=$state") 

根据数据的大小,输入的分割,存储和持久级别的级别数可能比多个过滤器更快或更慢。

+0

也许晚一点的问题。但是当我在Spark 2.2.0中尝试python代码时,我总是得到一个“列不可调用”的错误。我尝试了几种方法,但仍然遇到同样的错误。任何解决方法? – inneb

-1

如果您将数据框设置为临时表,则非常简单(如果spark版本为2)。

df1.createOrReplaceTempView("df1") 

现在你可以做的查询,

var df2 = spark.sql("select * from df1 where state = 'FL'") 
var df3 = spark.sql("select * from df1 where state = 'MN'") 
var df4 = spark.sql("select * from df1 where state = 'AL'") 

现在你得到了DF2,DF3,DF4。如果你想让它们成为列表,你可以使用,

df2.collect() 
df3.collect() 

甚至映射/过滤功能。请参考https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes

+0

有没有可能在火花中循环SQL查询?在收集所有不同的值之前,然后用“where state ='i'”替换“where state ='FL'”或类似的东西? – inneb

+0

这将是开销,但你仍然可以使用Spark Dataframes和SCALA编码来处理它 – ashK

相关问题