2017-08-12 46 views
1

我有格式计算总和,多个顶部的K值的计数火花

+---------------------------------+ 
|name| values |score |row_number| 
+---------------------------------+ 
|A |1000 |0  |1  | 
|B |947 |0  |2  | 
|C |923 |1  |3  | 
|D |900 |2  |4  | 
|E |850 |3  |5  | 
|F |800 |1  |6  | 
+---------------------------------+ 

我需要获得总和(值)时评分> 0和ROW_NUMBER < K(I,E)SUM的输入数据帧当数据框中的前k个值得分> 0时,所有值的总和。

我可以通过运行以下查询百强值

val top_100_data = df.select(
     count(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("count_100"), 
     sum(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("sum_filtered_100"), 
     sum(when(col("row_number") <=100, col(values))).alias("total_sum_100") 
    ) 

不过,我需要为高档100,200,300 ...... 2500取数据,以实现这一目标。这意味着我需要运行这个查询25次,最后结合25个数据帧。

我是新来的火花,还有很多东西。解决这个问题最好的办法是什么?

谢谢!

回答

1

您可以创建一个限制为Array

val topFilters = Array(100, 200, 300) // you can add more 

然后你就可以通过topFilters阵列循环,并在您需要的dataframe我建议你使用join而不是union,因为join会给你单独的columnsunions会给你单独的rows。你可以做以下

鉴于你dataframe作为

+----+------+-----+----------+ 
|name|values|score|row_number| 
+----+------+-----+----------+ 
|A |1000 |0 |1   | 
|B |947 |0 |2   | 
|C |923 |1 |3   | 
|D |900 |2 |200  | 
|E |850 |3 |150  | 
|F |800 |1 |250  | 
+----+------+-----+----------+ 

您可以通过使用作为

import sqlContext.implicits._ 
import org.apache.spark.sql.functions._ 
var finalDF : DataFrame = Seq("1").toDF("rowNum") 
for(k <- topFilters) { 
    val top_100_data = df.select(lit("1").as("rowNum"), sum(when(col("score") > 0 && col("row_number") < k, col("values"))).alias(s"total_sum_$k")) 
    finalDF = finalDF.join(top_100_data, Seq("rowNum")) 
} 
finalDF.show(false) 

上面定义的topFilters阵列应该给你最后dataframe作为

+------+-------------+-------------+-------------+ 
|rowNum|total_sum_100|total_sum_200|total_sum_300| 
+------+-------------+-------------+-------------+ 
|1  |923   |1773   |3473   | 
+------+-------------+-------------+-------------+ 

您可以为你的25个限制做同样的事情。

如果您打算使用union,那么这个想法与上述类似。

我希望答案是有帮助的

更新

如果您需要工会,那么你可以申请以下逻辑上面

var finalDF : DataFrame = Seq((0, 0, 0, 0)).toDF("limit", "count", "sum_filtered", "total_sum") 
for(k <- topFilters) { 
    val top_100_data = df.select(lit(k).as("limit"), count(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("count"), 
    sum(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("sum_filtered"), 
    sum(when(col("row_number") <=k, col("values"))).alias("total_sum")) 
    finalDF = finalDF.union(top_100_data) 
} 
finalDF.filter(col("limit") =!= 0).show(false) 

定义相同限制阵列应该给你

+-----+-----+------------+---------+ 
|limit|count|sum_filtered|total_sum| 
+-----+-----+------------+---------+ 
|100 |1 |923   |2870  | 
|200 |3 |2673  |4620  | 
|300 |4 |3473  |5420  | 
+-----+-----+------------+---------+ 
+0

嗨!谢谢你的回答,这非常有帮助!!所以我会每K需要3列(sum_100_filtered_score,total_sum_100,count_filtered_score_100)。通过加入数据集,我为每个字段获得一列。这就是为什么我试图使用工会 – Vignesh

+0

然后去。 :)而不是加入你可以使用联合。 –

+0

看到我更新的答案:)你可以接受和upvote,如果它真的帮助你 –