3

通常,组中的所有行都被传递给聚合函数。我想使用一个条件来过滤行,以便只有组中的某些行被传递给一个聚合函数。这种操作可能与PostgreSQL。我想用Spark SQL DataFrame(Spark 2.0.0)做同样的事情。如何使用spark sql筛选特定聚合的行?

代码可能看起来是这样的:

val df = ... // some data frame 
df.groupBy("A").agg(
    max("B").where("B").less(10), // there is no such method as `where` :(
    max("C").where("C").less(5) 
) 

因此,对于这样一个数据帧:

| A | B | C | 
| 1| 14| 4| 
| 1| 9| 3| 
| 2| 5| 6| 

其结果将是:

|A|max(B)|max(C)| 
|1| 9|  4| 
|2| 5| null| 

是否有可能与Spark SQL?

请注意,通常可以使用除max以外的任何其他聚合函数,并且可以在具有任意过滤条件的同一列上使用多个聚合。

+0

我'首先用null或NaN替换超出限制的所有值,然后我将groupBy和聚合。 –

+0

这适用于这种特殊情况,但如果在具有不同过滤条件的同一列上有多个聚合,它将不起作用。 –

回答

0
>>> df = sc.parallelize([[1,14,1],[1,9,3],[2,5,6]]).map(lambda t: Row(a=int(t[0]),b=int(t[1]),c=int(t[2]))).toDF() 
    >>> df.registerTempTable('t') 
    >>> res = sqlContext.sql("select a,max(case when b<10 then b else null end) mb,max(case when c<5 then c else null end) mc from t group by a") 

    +---+---+----+ 
    | a| mb| mc| 
    +---+---+----+ 
    | 1| 9| 3| 
    | 2| 5|null| 
    +---+---+----+ 

可以使用SQL(我相信你在Postgres的做同样的事情?)

0
df.groupBy("name","age","id").agg(functions.max("age").$less(20),functions.max("id").$less("30")).show(); 

样本数据:

name age id 
abc  23 1001 
cde  24 1002 
efg  22 1003 
ghi  21 1004 
ijk  20 1005 
klm  19 1006 
mno  18 1007 
pqr  18 1008 
rst  26 1009 
tuv  27 1010 
pqr  18 1012 
rst  28 1013 
tuv  29 1011 
abc  24 1015 

输出:

+----+---+----+---------------+--------------+ 
|name|age| id|(max(age) < 20)|(max(id) < 30)| 
+----+---+----+---------------+--------------+ 
| rst| 26|1009|   false|   true| 
| abc| 23|1001|   false|   true| 
| ijk| 20|1005|   false|   true| 
| tuv| 29|1011|   false|   true| 
| efg| 22|1003|   false|   true| 
| mno| 18|1007|   true|   true| 
| tuv| 27|1010|   false|   true| 
| klm| 19|1006|   true|   true| 
| cde| 24|1002|   false|   true| 
| pqr| 18|1008|   true|   true| 
| abc| 24|1015|   false|   true| 
| ghi| 21|1004|   false|   true| 
| rst| 28|1013|   false|   true| 
| pqr| 18|1012|   true|   true| 
+----+---+----+---------------+--------------+ 
+0

这实际上并没有回答原来的问题。这只是在聚合之后提供额外的运算符,而不是之前的过滤。 –

4
val df = Seq((1,14,4),(1,9,3),(2,5,6)).toDF("a","b","c") 

val agg = df.groupBy("a").agg(max(when($"b" < 10, $"b")).as("MaxB"), max(when($"c" < 5, $"c")).as("MaxC")) 

agg.show 
+1

如果你解释你在这里做什么,这将是很好的 – MZaragoza