0

我想循环遍历Spark程序中的数据帧列并计算最小值和最大值。 我是Spark和Scala的新手,一旦我在数据框中获取它,就无法迭代列。在火花数据框中遍历列并计算最小最大值

我已经尝试运行下面的代码,但它需要传递给它的列号,问题是我如何从数据框中获取它并动态传递它并将结果存储在集合中。

val parquetRDD = spark.read.parquet("filename.parquet") 

parquetRDD.collect.foreach ({ i => parquetRDD_subset.agg(max(parquetRDD(parquetRDD.columns(2))), min(parquetRDD(parquetRDD.columns(2)))).show()}) 

感谢这方面的帮助。

回答

1

您不应该迭代行或记录。您应该使用汇总功能

import org.apache.spark.sql.functions._ 
val df = spark.read.parquet("filename.parquet") 
val aggCol = col(df.columns(2)) 
df.agg(min(aggCol), max(aggCol)).show() 

首先,当您执行spark.read.parquet时,您正在读取数据帧。 接下来我们使用col函数定义我们想要使用的列。 col函数将列名转换为列。您可以改为使用df(“name”),其中name是该列的名称。

agg函数使用聚合列,因此min和max是聚合函数,它们获取列并返回具有聚合值的列。

更新

根据该意见,目的是最大和最小的所有列。因此,你可以这样做:

val minColumns = df.columns.map(name => min(col(name))) 
val maxColumns = df.columns.map(name => max(col(name))) 
val allMinMax = minColumns ++ maxColumns 
df.agg(allMinMax.head, allMinMax.tail: _*).show() 

你也可以简单地做:

df.describe().show() 

,让你的所有列,包括最小值,最大值,平均值,计数和STDDEV

+0

感谢阿萨夫统计您的响应。但是在val中,aggCol = col(df.columns(2))实际上并没有手动传递列号(在这种情况下为2)。有没有一种方法可以动态地传递它,以便我可以在循环中逐个遍历列并生成最小最大值。谢谢。 – sabby

+0

谢谢阿萨夫!它确实有帮助,但是这可以写成循环,以便我不需要手动传递列名。当我说我的问题迭代时,我的意思是逐个循环遍历列。 在下面的例子中,我们有三列,我会动态地选择每一列,计算它的最小最大值,而不需要 必须手动传递列名。假定col1,col2,col3的任意随机值为 col1 col2 col3 – sabby

相关问题