你可以收集独有的状态值,并简单地映射在结果数组:
val states = df.select("State").distinct.collect.flatMap(_.toSeq)
val byStateArray = states.map(state => df.where($"State" <=> state))
或来图:
val byStateMap = states
.map(state => (state -> df.where($"State" <=> state)))
.toMap
同样的事情在Python:
from itertools import chain
from pyspark.sql.functions import col
states = chain(*df.select("state").distinct().collect())
# PySpark 2.3 and later
# In 2.2 and before col("state") == state)
# should give the same outcome, ignoring NULLs
# if NULLs are important
# (lit(state).isNull() & col("state").isNull()) | (col("state") == state)
df_by_state = {state:
df.where(col("state").eqNullSafe(state)) for state in states}
明显问题在于它需要对每个级别进行全面的数据扫描,所以这是一项昂贵的操作。如果你正在寻找一种方式,只是把标准输出也看到How do I split an RDD into two or more RDDs?
特别是你可以写Dataset
所关心列分区:
val path: String = ???
df.write.partitionBy("State").parquet(path)
,如果需要回读:
// Depend on partition prunning
for { state <- states } yield spark.read.parquet(path).where($"State" === state)
// or explicitly read the partition
for { state <- states } yield spark.read.parquet(s"$path/State=$state")
根据数据的大小,输入的分割,存储和持久级别的级别数可能比多个过滤器更快或更慢。
为什么你需要在多个数据帧中拆分数据帧? (AL,Seq(24 AL,4 34 AL)),(MN,Seq(35 MN,5 78 MN)),(FL,Seq(46 FL 6 99 FL))]使用groupBy。 –
groupBy给出GroupDate类型,我该如何将它转换为Array? – user1735076
你能解释一下你想用这个数组做什么? – lev