1

我已经用group和by函数写了pyspark代码。由于团队的影响,我感觉性能受到影响。相反,我想使用reducebykey。但我对这个领域很陌生。请找我的情况之下,如何在pyspark数据框中将groupby转换为reducebykey?

第1步:阅读蜂巢表连接查询数据直通sqlcontext,并存储在数据帧

第二步:输入总列数是15.在这5个重点领域和其余是数字值。

第3步:除了上面的输入列之外,还有几列需要从数字列导出。几列有默认值。

第4步:我已经使用了group by和sum函数。如何使用map和reducebykey选项以spark方式执行类似的逻辑。

from pyspark.sql.functions import col, when, lit, concat, round, sum 

#sample data 
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"]) 

#populate col5, col6, col7 
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col7 = col('col2') 
df1 = df.withColumn("col5", col5).\ 
    withColumn("col6", col6).\ 
    withColumn("col7", col7) 

#populate col8, col9, col10 
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col10= concat(col('col2'), lit("_NEW")) 
df2 = df.withColumn("col5", col8).\ 
    withColumn("col6", col9).\ 
    withColumn("col7", col10) 

#final dataframe 
final_df = df1.union(df2) 
final_df.show() 

#groupBy calculation 
#final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")).show()from pyspark.sql.functions import col, when, lit, concat, round, sum 

#sample data 
df = sc.parallelize([(1, 2, 3, 4), (5, 6, 7, 8)]).toDF(["col1", "col2", "col3", "col4"]) 

#populate col5, col6, col7 
col5 = when((col('col1') == 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col6 = when((col('col1') == 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col7 = col('col2') 
df1 = df.withColumn("col5", col5).\ 
    withColumn("col6", col6).\ 
    withColumn("col7", col7) 

#populate col8, col9, col10 
col8 = when((col('col1') != 0) & (col('col3') != 0), round(col('col4')/ col('col3'), 2)).otherwise(0) 
col9 = when((col('col1') != 0) & (col('col4') != 0), round((col('col3') * col('col4'))/ col('col1'), 2)).otherwise(0) 
col10= concat(col('col2'), lit("_NEW")) 
df2 = df.withColumn("col5", col8).\ 
    withColumn("col6", col9).\ 
    withColumn("col7", col10) 

#final dataframe 
final_df = df1.union(df2) 
final_df.show() 

#groupBy calculation 
final_df.groupBy("col1", "col2", "col3", "col4").agg(sum("col5")........sum("coln")).show() 
+0

[DataFrame groupBy行为/优化]的可能重复(https://stackoverflow.com/questions/32902982/dataframe-groupby-behaviour-optimization) – user8371915

回答

1

Spark SQL中没有reduceByKey

groupBy +聚合函数将与RDD.reduceByKey几乎相同。 Spark会自动选择是否应该类似于RDD.groupByKey(即对于collect_list)或对RDD.reduceByKey

Dataset.groupBy +聚合函数的性能应该优于或等于RDD.reduceByKey。催化剂优化器注意如何在后台进行聚合

+0

至于I请记住,它只会在执行程序中添加额外的最终聚合步骤,而不是Spark SQL groupBy +聚合中的驱动程序。 –

+0

谢谢你的回复。我们不能在数据框上应用reduceByKey吗?很多文章告诉reduceByKey相同的东西比群组更快,因为大组数据集通过减少最后阶段的行数来减少。 – user3150024

+0

@ user3150024这些文章是关于RDD的 - 数据集有一个抽象层,Catalyst优化器优化查询:) –

相关问题