2017-08-30 115 views
1

是否有一种干净的方式来计算Spark Dataframe上的移动百分位数。在Spark Dataframe上移动百分位数

我有一个巨大的数据框,我每隔15分钟汇总一次,我想计算每个部分的百分位数。

df.groupBy(window(col("date").cast("timestamp"), "15 minutes")) 
    .agg(sum("session"),mean("session"),percentile_approx("session", 0.5)) 
    .show() 

错误:未发现:价值percentile_approx

所以我要像计算总和与平均值基本的东西,但我需要计算中位数和其他一些百分点。

在Spark 2.1中有这样做的有效方法吗?

因为在这里,没有平均percentile_approx,似乎API中实现Percentile_approx功能。

我看到这个问题已经被问到,但答案并不是都同意一个独特的解决方案。对我来说这很模糊......所以我想知道在2017年8月,是否有一个好的和有效的解决方案。

而当我浏览15分钟的窗口时,我想知道如果仅仅用硬计算它不会工作而不是近似值?

非常感谢您的关注,

祝大家下午好! PS:Scala或PySpark我不介意,两者都会更大!

+0

示例代码中的“window”是什么?你想要窗口函数(和滑动窗口)还是非重叠窗口(groupBy)? –

+0

感谢您的回答,并花时间回答我的问题!我有历史数据,我想每1分钟汇总一次。每分钟,我有数百条记录,并在每个滑动窗口(每分钟)我需要计算中位数等...所以我想知道什么是干净的方式来有效地做到这一点 – tricky

+0

好吧,但在这种情况下窗口并不真正“滑动”......因为对于滑动窗口,您需要窗口函数。AFAIK滑动意味着你的情况:对于每个记录,采取“周围”15分钟的数据并计算聚合 –

回答

1

好了,所以我是非常愚蠢的,我猜。

我不得不callUDF添加到我以前的想法:percentile_approx。对不起,我不同意

callUDF("percentile_approx", col("session"), lit(0.5)) 

因此,例如,在我的情况,我想聚合每分钟2个月历史数据集:

df.groupBy(window((col("date")/1000).cast("timestamp"), "1 minutes")) 
.agg(sum("session"),mean("session"),callUDF("percentile_approx", col("session"), lit(0.5))) 
.show() 

(在milisecond时间戳因此/1000

+0

多数民众赞成在很大程度上,我不知道你可以使用百分比作为聚合功能! –

+0

只是为了澄清:percentile_approx(你也可以使用percentile)是一个内置的HIVE UDAF(https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF) )),所以它不是由Spark实现的,而是由Hive实现的(如果你有hive-support(或hiveContext),你只能使用它。 –

1

如果你不需要滑动(重叠)窗口,你可以用groupBy来完成。据我所知,没有百分聚合功能,让你无论是要实现自己的UDAF或用下面的办法:

val df = (1 to 100).map(i => (
    i/10, scala.util.Random.nextDouble) 
).toDF("time","session") 

val calcStats = udf((data:Seq[Double]) => { 
    (data.sum, 
    data.sum/data.size, 
    data.sorted.apply(data.size/2) // is ~ median, replace with your desired logic 
) 
}) 

df.groupBy($"time") 
    .agg(collect_list($"session").as("sessions")) 
    .withColumn("stats",calcStats($"sessions").cast("struct<sum:double,mean:double,median:double>")) 
    .select($"time",$"stats.*") 
    .orderBy($"time") 
    .show 

+----+------------------+-------------------+-------------------+ 
|time|    sum|    mean|    median| 
+----+------------------+-------------------+-------------------+ 
| 0|3.5441618790222287| 0.3937957643358032| 0.3968893251191352| 
| 1|3.6612518806543757| 0.3661251880654376| 0.4395039388994335| 
| 2| 4.040992655970037|0.40409926559700365| 0.3522214051715915| 
| 3| 4.583175830988081| 0.4583175830988081| 0.5800394949546751| 
| 4| 3.849409207658501| 0.3849409207658501|0.43422232330495936| 
| 5| 5.514681139649785| 0.5514681139649784| 0.6703416471647694| 
| 6| 4.890227540935781| 0.4890227540935781| 0.5515164635420178| 
| 7|4.1148083531280095|0.41148083531280094| 0.4384132796986667| 
| 8| 5.723834881155167| 0.5723834881155166| 0.6415902834329499| 
| 9| 5.559212938582014| 0.5559212938582014| 0.6816268800227596| 
| 10|0.8867335786067405| 0.8867335786067405| 0.8867335786067405| 
+----+------------------+-------------------+-------------------+ 
+0

感谢您的帮助!你的回答帮助我更好地理解UDF,祝你有美好的一天! – tricky