2016-01-25 134 views
1

让说我有一个数据帧(存储在斯卡拉VAL为df),其中包含从CSV数据:火花(斯卡拉)数据帧过滤(FIR)

time,temperature 
0,65 
1,67 
2,62 
3,59 

,我有没有问题,从文件中读取该作为scala语言的火花数据框。

我想补充一个过滤柱(由过滤器我的意思是信号处理的移动平均滤波),(说我想要做(T[n]+T[n-1])/2.0):

time,temperature,temperatureAvg 
0,65,(65+0)/2.0 
1,67,(67+65)/2.0 
2,62,(62+67)/2.0 
3,59,(59+62)/2.0 

(其实,说第一行,我想要32.5而不是(65+0)/2.0。我写了它来澄清预期的2-time-step过滤操作输出)

那么如何实现这个呢?我不熟悉的火花数据帧操作沿柱反复结合的行...

回答

5

星火2.0+

火花2.0及更高版本,可以使用window功能为groupBy一个输入。它允许您指定windowDurationslideDurationstartTime(偏移量)。它只适用于TimestampType列,但不难找到解决方法。你的情况,这将需要一些额外的步骤来纠正界限,而通用的解决方案可以表述为如下图所示:

import org.apache.spark.sql.functions.{window, avg} 

df 
    .withColumn("ts", $"time".cast("timestamp")) 
    .groupBy(window($"ts", windowDuration="2 seconds", slideDuration="1 second")) 
    .avg("temperature") 

星火< 2.0

如果分割你的数据,你可以很自然地使用窗口功能如下:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.mean 

val w = Window.partitionBy($"id").orderBy($"time").rowsBetween(-1, 0) 

val df = sc.parallelize(Seq(
    (1L, 0, 65), (1L, 1, 67), (1L, 2, 62), (1L, 3, 59) 
)).toDF("id", "time", "temperature") 

df.select($"*", mean($"temperature").over(w).alias("temperatureAvg")).show 

// +---+----+-----------+--------------+        
// | id|time|temperature|temperatureAvg| 
// +---+----+-----------+--------------+ 
// | 1| 0|   65|   65.0| 
// | 1| 1|   67|   66.0| 
// | 1| 2|   62|   64.5| 
// | 1| 3|   59|   60.5| 
// +---+----+-----------+--------------+ 

可以使用lead/lag功能创建任意砝码窗口:

lit(0.6) * $"temperature" + 
lit(0.3) * lag($"temperature", 1) + 
lit(0.2) * lag($"temperature", 2) 

不存在partitionBy条款,但仍然是可能的,但效率极低。如果是这种情况,您将无法使用DataFrames。相反,您可以在RDD上使用sliding(请参阅Operate neighbor elements in RDD in Spark)。还有spark-timeseries包可能会发现有用。

+0

谢谢零!它工作得很好。一个相关的问题,如果我想做过滤而不是'0.5T [n] + 0.5T [n-1]'而想要'0.6T [n] + 0.3T [n-1] + 0.1T [n -2]'其中'T [n]'是第n行的温度? –

+1

而不是“平均”使用“滞后”和“铅”。 – zero323