2015-11-16 184 views
10

我火花新手,我想下面源数据帧(负载从JSON文件)转换:下面结果数据帧火花数据帧变换多行

+--+-----+-----+ 
|A |count|major| 
+--+-----+-----+ 
| a| 1| m1| 
| a| 1| m2| 
| a| 2| m3| 
| a| 3| m4| 
| b| 4| m1| 
| b| 1| m2| 
| b| 2| m3| 
| c| 3| m1| 
| c| 4| m3| 
| c| 5| m4| 
| d| 6| m1| 
| d| 1| m2| 
| d| 2| m3| 
| d| 3| m4| 
| d| 4| m5| 
| e| 4| m1| 
| e| 5| m2| 
| e| 1| m3| 
| e| 1| m4| 
| e| 1| m5| 
+--+-----+-----+ 

进入

+--+--+--+--+--+--+ 
|A |m1|m2|m3|m4|m5| 
+--+--+--+--+--+--+ 
| a| 1| 1| 2| 3| 0| 
| b| 4| 2| 1| 0| 0| 
| c| 3| 0| 4| 5| 0| 
| d| 6| 1| 2| 3| 4| 
| e| 4| 5| 1| 1| 1| 
+--+--+--+--+--+--+ 

这里是转化规律

  1. 结果数据帧是由具有A + (n major columns)其中major列名由指定:

    sorted(src_df.map(lambda x: x[2]).distinct().collect()) 
    
  2. 结果数据帧包含m行,其中通过提供用于A列的值:

    sorted(src_df.map(lambda x: x[0]).distinct().collect()) 
    
  3. 每个专业的价值结果数据帧中的列是来自相应的A上的源数据帧的值和主要的 (例如,在源数据帧中第1行的计数被映射到box其中Aa和列m1

  4. 在源数据帧Amajor的组合不具有重复(请考虑它在所述两列的主键SQL)

+0

我试图来从现有的数据库与表的确切格式的一些数据,如每“A”各行“大”代表一个功能到'A',所以我需要将数据帧转换为我发布的格式,以便将数据移植到spark ALS计算中。 – resec

+0

不要让我知道我是否以错误的方式使用火花,并且应该在火花之外进行这种数据转换。 – resec

回答

8

让我们开始与示例数据:

df = sqlContext.createDataFrame([ 
    ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"), 
    ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"), 
    ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"), 
    ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"), 
    ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"), 
    ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"), 
    ("e", 1, "m4"), ("e", 1, "m5")], 
    ("a", "cnt", "major")) 

请注意,我已经改变了countcnt。 Count是大多数SQL方言中的保留关键字,它不是列名的好选择。

至少有两种方法可以重塑这个数据:

  • 聚集在数据帧

    from pyspark.sql.functions import col, when, max 
    
    majors = sorted(df.select("major") 
        .distinct() 
        .map(lambda row: row[0]) 
        .collect()) 
    
    cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m) 
        for m in majors] 
    maxs = [max(col(m)).alias(m) for m in majors] 
    
    reshaped1 = (df 
        .select(col("a"), *cols) 
        .groupBy("a") 
        .agg(*maxs) 
        .na.fill(0)) 
    
    reshaped1.show() 
    
    ## +---+---+---+---+---+---+ 
    ## | a| m1| m2| m3| m4| m5| 
    ## +---+---+---+---+---+---+ 
    ## | a| 1| 1| 2| 3| 0| 
    ## | b| 4| 1| 2| 0| 0| 
    ## | c| 3| 0| 4| 5| 0| 
    ## | d| 6| 1| 2| 3| 4| 
    ## | e| 4| 5| 1| 1| 1| 
    ## +---+---+---+---+---+---+ 
    
  • groupBy在RDD

    from pyspark.sql import Row 
    
    grouped = (df 
        .map(lambda row: (row.a, (row.major, row.cnt))) 
        .groupByKey()) 
    
    def make_row(kv): 
        k, vs = kv 
        tmp = dict(list(vs) + [("a", k)]) 
        return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors}) 
    
    reshaped2 = sqlContext.createDataFrame(grouped.map(make_row)) 
    
    reshaped2.show() 
    
    ## +---+---+---+---+---+---+ 
    ## | a| m1| m2| m3| m4| m5| 
    ## +---+---+---+---+---+---+ 
    ## | a| 1| 1| 2| 3| 0| 
    ## | e| 4| 5| 1| 1| 1| 
    ## | c| 3| 0| 4| 5| 0| 
    ## | b| 4| 1| 2| 0| 0| 
    ## | d| 6| 1| 2| 3| 4| 
    ## +---+---+---+---+---+---+ 
    
+0

出于好奇,什么'reshaped1 =(df.select(col(“a”),* cols).groupBy(“a”)。agg(* maxs).na.fill(0))'代表in Scala呢?我有麻烦了解* cols和* maxs。 – eliasah

+1

@eliasah它是参数解包,大致等同于这样的'cols:Seq [Column] = ???; df.select(cols:_ *)' – zero323

+0

好的谢谢!但那么agg(* max)将如何转换?因为agg需要一个表达式。 – eliasah

3

使用zero323的据帧,

df = sqlContext.createDataFrame([ 
("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"), 
("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"), 
("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"), 
("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"), 
("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"), 
("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"), 
("e", 1, "m4"), ("e", 1, "m5")], 
("a", "cnt", "major")) 

你也可以使用

reshaped_df = df.groupby('a').pivot('major').max('cnt').fillna(0) 
+0

这是一个很好的解决方案。谢谢!这里是它的Scala版本,如果需要的话:val reshaped_df = transposed_res.groupBy(“a”)。pivot(“major”).max(“cnt”)。na.fill(0) – Hako