2016-11-22 69 views
3

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html很好地解释了转轴如何工作的火花。没有聚合的火花转轴

在我的Python代码,我用熊猫没有聚集,但是重置索引和连接:

pd.pivot_table(data=dfCountries, index=['A'], columns=['B']) 
countryToMerge.index.name = 'ISO' 
df.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner') 

这是如何工作的火花?

我试着组,并加入手动喜欢:

val grouped = countryKPI.groupBy("A").pivot("B") 
df.join(grouped, df.col("ISO") === grouped.col("ISO")).show 

,但不起作用。 reset_index如何适应spark?它如何以spark原生方式实现?

编辑

的Python代码的一个小例子:

import pandas as pd 
from datetime import datetime, timedelta 
import numpy as np 
dates = pd.DataFrame([(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(10)], columns=["dates"]) 
isos = pd.DataFrame(["ABC", "POL", "ABC", "POL","ABC", "POL","ABC", "POL","ABC", "POL"], columns=['ISO']) 
dates['ISO'] = isos.ISO 
dates['ISO'] = dates['ISO'].astype("category") 
countryKPI = pd.DataFrame({'country_id3':['ABC','POL','ABC','POL'], 
         'indicator_id':['a','a','b','b'], 
         'value':[7,8,9,7]}) 
countryToMerge = pd.pivot_table(data=countryKPI, index=['country_id3'], columns=['indicator_id']) 
countryToMerge.index.name = 'ISO' 
print(dates.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner')) 

    dates ISO a b 
0 2016-01-01 ABC 7 9 
1 2016-01-03 ABC 7 9 
2 2016-01-05 ABC 7 9 
3 2016-01-07 ABC 7 9 
4 2016-01-09 ABC 7 9 
5 2016-01-02 POL 8 7 
6 2016-01-04 POL 8 7 
7 2016-01-06 POL 8 7 
8 2016-01-08 POL 8 7 
9 2016-01-10 POL 8 7 

以沿阶遵循/火花

val dates = Seq(("2016-01-01", "ABC"), 
    ("2016-01-02", "ABC"), 
    ("2016-01-03", "POL"), 
    ("2016-01-04", "ABC"), 
    ("2016-01-05", "POL"), 
    ("2016-01-06", "ABC"), 
    ("2016-01-07", "POL"), 
    ("2016-01-08", "ABC"), 
    ("2016-01-09", "POL"), 
    ("2016-01-10", "ABC") 
).toDF("dates", "ISO") 
    .withColumn("dates", 'dates.cast("Date")) 

    dates.show 
    dates.printSchema 

    val countryKPI = Seq(("ABC", "a", 7), 
    ("ABC", "b", 8), 
    ("POL", "a", 9), 
    ("POL", "b", 7) 
).toDF("country_id3", "indicator_id", "value") 

    countryKPI.show 
    countryKPI.printSchema 

val grouped = countryKPI.groupBy("country_id3").pivot("indicator_id") 

回答

0

下面的代码片断似乎工作 - 但我不知道如果通过avg汇总是正确的 - 即使“拟合数字”是输出。

countryKPI.groupBy("country_id3").pivot("indicator_id").avg("value").show 

我不知道这是否是“低效”的数据相比,只是重复使用的值更大的量(平均)(因为我不想要聚合)。

+0

即使我正在寻找没有聚合的枢轴功能。 @Georg Heiler,你同时找到了什么? – user3560220

+0

只有在这里发布的内容。不幸。 –