2017-03-17 106 views
0

我有一个火花数据帧是这样的:如何扁平pySpark数据框?

id | Operation | Value | 
-------------------------- 
1 | Date_Min | 148590 | 
1 | Date_Max | 148590 | 
1 | Device | iphone | 
2 | Date_Min | 148590 | 
2 | Date_Max | 148590 | 
2 | Review | Good | 
3 | Date_Min | 148590 | 
3 | Date_Max | 148590 | 
3 | Review | Bad | 
3 | Review | samsung| 

我使用的Spark 2.1.0与pyspark:

id |   Operation     |  Value 
----------------------------------------------------------- 
1 | [Date_Min, Date_Max, Device]   | [148590, 148590, iphone]  
2 | [Date_Min, Date_Max, Review]   | [148590, 148590, Good]  
3 | [Date_Min, Date_Max, Review, Device] | [148590, 148590, Bad,samsung]  

,我期待resul。我试过这个solution,但它只适用于一列。

感谢

+0

我仍然无法找出好办法做到这一点特定任务。我试图分开展开列'df1 = df.select('id',explode(col(“Operation”)))', 'df2 = df.select('id',explode(col(“Value”)) )'。但是,如何将两个数据框水平地堆叠在一起没有很好的解决方案。 – titipata

回答

0

下面是一个例子数据帧从上面。我使用这个solution为了解决你的问题。

df = spark.createDataFrame(
    [[1, ['Date_Min', 'Date_Max', 'Device'], ['148590', '148590', 'iphone']], 
     [2, ['Date_Min', 'Date_Max', 'Review'], ['148590', '148590', 'Good']],  
     [3, ['Date_Min', 'Date_Max', 'Review', 'Device'], ['148590', '148590', 'Bad', 'samsung']]], 
    schema=['id', 'l1', 'l2']) 

在这里,您可以定义udf先为每行压缩两个列表。

from pyspark.sql.types import * 
from pyspark.sql.functions import col, udf, explode 

zip_list = udf(
    lambda x, y: list(zip(x, y)), 
    ArrayType(StructType([ 
     StructField("first", StringType()), 
     StructField("second", StringType()) 
    ])) 
) 

最后,您可以将两列压缩在一起,然后展开该列。

df_out = df.withColumn("tmp", zip_list('l1', 'l2')).\ 
    withColumn("tmp", explode("tmp")).\ 
    select('id', col('tmp.first').alias('Operation'), col('tmp.second').alias('Value')) 
df_out.show() 

输出

+---+---------+-------+ 
| id|Operation| Value| 
+---+---------+-------+ 
| 1| Date_Min| 148590| 
| 1| Date_Max| 148590| 
| 1| Device| iphone| 
| 2| Date_Min| 148590| 
| 2| Date_Max| 148590| 
| 2| Review| Good| 
| 3| Date_Min| 148590| 
| 3| Date_Max| 148590| 
| 3| Review| Bad| 
| 3| Device|samsung| 
+---+---------+-------+ 
+0

谢谢!它确实很好。 – Omar14

+0

没问题@ Omar14! – titipata

+0

最后,我仍然有一个函数zip_list的问题。当我使用Zeppelin笔记本时,它可以工作,但是当我尝试使用spark-submit自动执行作业和脚本时,作业失败,出现此错误: ''zip argument#1 must support iteration' – Omar14

-1

如果使用数据帧,然后试试这个: -

import pyspark.sql.functions as F 

your_df.select("id", F.explode("Operation"), F.explode("Value")).show() 
+0

当我在同一时间爆炸2列时,它不起作用。 – titipata