2016-05-31 81 views
0

我在使用python将spark结构的RDD转换为spark中的数据框时遇到困难。将变化的元组的RDD转换为Spark中的DataFrame

df1=[['usr1',('itm1',2),('itm3',3)], ['usr2',('itm2',3), ('itm3',5),(itm22,6)]] 

转换后,我的数据帧应该如下所示:

 usr1 usr2 
itm1 2.0 NaN 
itm2 NaN 3.0 
itm22 NaN 6.0 
itm3 3.0 5.0 

我最初想coverting上述RDD结构如下:

df1={'usr1': {'itm1': 2, 'itm3': 3}, 'usr2': {'itm2': 3, 'itm3': 5, 'itm22':6}} 

然后使用Python的大熊猫模块pand=pd.DataFrame(dat2),然后使用spark_df = context.createDataFrame(pand)将pandas数据帧转换回火花数据帧。但是,我相信,通过这样做,我将RDD转换为非RDD对象,然后转换回RDD,这是不正确的。有些人可以帮我解决这个问题吗?

+0

这是怎么回事,不包括列选择,[从你以前的问题](http://stackoverflow.com/q/37514344/1560062)? – zero323

+0

请注意,在我之前的问题中,我更关心处理同一用户的重复“itms”(请参阅​​“如果在上面的元组中有多个计数字段,即('itm1',3)如何合并(或添加)这个值3到列联表(或实体 - 项目矩阵)的最终结果中。“由于给出的答案仍然不清楚(至少从我的角度来看),如果我能够为此得到一个解决方案问题,我可以关闭对我以前的问题的答案。 – Rkz

回答

2

有了这样的数据:

rdd = sc.parallelize([ 
    ['usr1',('itm1',2),('itm3',3)], ['usr2',('itm2',3), ('itm3',5),('itm22',6)] 
]) 

拼合记录:

def to_record(kvs): 
    user, *vs = kvs # For Python 2.x use standard indexing/splicing 
    for item, value in vs: 
     yield user, item, value 

records = rdd.flatMap(to_record) 

转换到DataFrame

df = records.toDF(["user", "item", "value"]) 

支点:

result = df.groupBy("item").pivot("user").sum() 

result.show() 
## +-----+----+----+ 
## | item|usr1|usr2| 
## +-----+----+----+ 
## | itm1| 2|null| 
## | itm2|null| 3| 
## | itm3| 3| 5| 
## |itm22|null| 6| 
## +-----+----+----+ 

备注:Spark DataFrames旨在处理长时间和相对较薄的数据。如果要生成广泛的应急表,DataFrames将不会有用,特别是在数据密集且您希望为每个功能保留单独列的情况下。

+0

完美的非常感谢!并且感谢那些额外的信息。 – Rkz

相关问题