2017-09-14 36 views
1

我是Spark新手,需要从单行生成多行和多列。如何在pyspark中从单行添加多行和多列?

输入:

col1 col2 col3 col4 

输出

col1 col2 col3 col4 col5 col6 col7 

col1 col2 col3 col4 col8 col9 col10 

Logics for new columns: 

**col5 :** 

if col1==0 and col3!=0: 
    col5 = col4/col3 

else: 
    col5 = 0 


**col6 :** 

if col1==0 and col4!=0: 
    col6 = (col3*col4)/col1 

else: 
    col6 = 0 

For first row col7 holds same value as col2 

**col8 :** 

if col1!=0 and col3!=0: 
    col8 = col4/col3 

else: 
    col8 = 0 
**col9 :** 

if col1!=0 and col4!=0: 
    col9 = (col3*col4)/col1 

else: 
    col9 = 0 

For second row col10 = col2+ "_NEW" 

在结束 '和' 功能需要由与组施加。一旦我们转换上述结构,希望这会很容易。

google中的大多数文章解释了如何使用“withcolumn”选项而不是多列来将单列添加到现有数据框。文章没有解释这种情况。所以我想请求你的帮助。

回答

0

有几个选项:

  1. 使用withColumn多次,因为你需要(即你需要多少列添加)数据帧
  2. 使用map解析柱,以适当的列返回然后创建DataFrame。
2

希望这有助于!

from pyspark.sql.functions import col, when, lit, concat, round, sum 

#sample data 
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"]) 

#populate col5, col6, col7 
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col7 = col('col2') 
df1 = df.withColumn("col5", col5).\ 
    withColumn("col6", col6).\ 
    withColumn("col7", col7) 

#populate col8, col9, col10 
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col10= concat(col('col2'), lit("_NEW")) 
df2 = df.withColumn("col5", col8).\ 
    withColumn("col6", col9).\ 
    withColumn("col7", col10) 

#final dataframe 
final_df = df1.union(df2) 
final_df.show() 

#groupBy calculation 
#final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")).show() 

输出是:

+----+----+----+----+----+----+-----+ 
|col1|col2|col3|col4|col5|col6| col7| 
+----+----+----+----+----+----+-----+ 
| 1| 2| 3| 4| 0.0| 0.0| 2| 
| 5| 6| 7| 8| 0.0| 0.0| 6| 
| 1| 2| 3| 4|1.33|12.0|2_NEW| 
| 5| 6| 7| 8|1.14|11.2|6_NEW| 
+----+----+----+----+----+----+-----+ 


不要忘了让我们知道是否能解决你的问题:)

+0

它完美地工作。谢谢 !如果可能的话,你能否给我建议一些学习材料,如书籍/论坛或在pyspark中熟悉的想法。 – user3150024

+0

很高兴它帮助!最好的地方是参考['Programming Guides'section](http://spark.apache.org/docs/latest/index.html)。 (顺便说一句 - 如果你喜欢这个答案,那么你应该[把它投票/标记为正确答案]) – Prem

+0

相同的查询在pyspark shell中执行得更快,但CLI 。我的源数据以ORC格式存在于Hive中。任何建议请 – user3150024