2016-05-27 90 views
7

枢轴String列,我有一个简单的数据帧是这样的:上Pyspark数据帧

rdd = sc.parallelize(
    [ 
     (0, "A", 223,"201603", "PORT"), 
     (0, "A", 22,"201602", "PORT"), 
     (0, "A", 422,"201601", "DOCK"), 
     (1,"B", 3213,"201602", "DOCK"), 
     (1,"B", 3213,"201601", "PORT"), 
     (2,"C", 2321,"201601", "DOCK") 
    ] 
) 
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"]) 

df_data.show() 
+---+----+----+------+----+ 
| id|type|cost| date|ship| 
+---+----+----+------+----+ 
| 0| A| 223|201603|PORT| 
| 0| A| 22|201602|PORT| 
| 0| A| 422|201601|DOCK| 
| 1| B|3213|201602|DOCK| 
| 1| B|3213|201601|PORT| 
| 2| C|2321|201601|DOCK| 
+---+----+----+------+----+ 

,我需要按日期来透视:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show() 

+---+----+------+------+------+ 
| id|type|201601|201602|201603| 
+---+----+------+------+------+ 
| 2| C|2321.0| null| null| 
| 0| A| 422.0| 22.0| 223.0| 
| 1| B|3213.0|3213.0| null| 
+---+----+------+------+------+ 

一切正常。但现在我需要转动它,并得到一个非数字列:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show() 

,当然我会得到一个异常:

AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;' 

我想对

线产生的东西
+---+----+------+------+------+ 
| id|type|201601|201602|201603| 
+---+----+------+------+------+ 
| 2| C|DOCK | null| null| 
| 0| A| DOCK | PORT| DOCK| 
| 1| B|DOCK |PORT | null| 
+---+----+------+------+------+ 

这可能与pivot

回答

10

假设(id |type | date)组合是独一无二的,你唯一的目标正在枢转,而不是聚集你可以使用first(或不局限于数值的任何其它功能):

from pyspark.sql.functions import first 

(df_data 
    .groupby(df_data.id, df_data.type) 
    .pivot("date") 
    .agg(first("ship")) 
    .show()) 

## +---+----+------+------+------+ 
## | id|type|201601|201602|201603| 
## +---+----+------+------+------+ 
## | 2| C| DOCK| null| null| 
## | 0| A| DOCK| PORT| PORT| 
## | 1| B| PORT| DOCK| null| 
## +---+----+------+------+------+ 

如果这些假设是不正确的你”你必须预先汇总你的数据。例如对于最常见的ship值:

from pyspark.sql.functions import max, struct 

(df_data 
    .groupby("id", "type", "date", "ship") 
    .count() 
    .groupby("id", "type") 
    .pivot("date") 
    .agg(max(struct("count", "ship"))) 
    .show()) 

## +---+----+--------+--------+--------+ 
## | id|type| 201601| 201602| 201603| 
## +---+----+--------+--------+--------+ 
## | 2| C|[1,DOCK]| null| null| 
## | 0| A|[1,DOCK]|[1,PORT]|[1,PORT]| 
## | 1| B|[1,PORT]|[1,DOCK]| null| 
## +---+----+--------+--------+--------+