2017-08-21 162 views
5

我有一个由时间戳列和美元列组成的数据集。我希望找到以每行时间戳结束的每周平均美元数。我最初是在查看pyspark.sql.functions.window函数,但是按星期计算数据。pyspark:使用时间序列数据滚动平均值

下面是一个例子:

%pyspark 
import datetime 
from pyspark.sql import functions as F 

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"]) 
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp')) 

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg')) 
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect() 

这将导致两个记录:

|  start  |   end   | avg | 
|---------------------|----------------------|-----| 
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0| 
|---------------------|----------------------|-----| 
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0| 
|---------------------|----------------------|-----| 

窗口功能分级的时间序列数据,而不是执行滚动平均值。

有没有办法执行一个滚动平均值,我将得到每行的周平均值,并在该行的timestampGMT处结束一段时间?

编辑:

张以下的答案是接近我想要的,但我想看看没有什么。

这里有一个更好的例子来说明什么,我想要知道的:

%pyspark 
from pyspark.sql import functions as F 
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"), 
         (13, "2017-03-15T12:27:18+00:00"), 
         (25, "2017-03-18T11:27:18+00:00")], 
         ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days")))) 

这将导致以下数据帧:

dollars timestampGMT   rolling_average 
25  2017-03-18 11:27:18.0 25 
17  2017-03-10 15:27:18.0 15 
13  2017-03-15 12:27:18.0 15 

我想一般是在在时间戳GMT列中继续执行日期,这将导致:

dollars timestampGMT   rolling_average 
17  2017-03-10 15:27:18.0 17 
13  2017-03-15 12:27:18.0 15 
25  2017-03-18 11:27:18.0 19 

在上面的结果中因为没有前面的记录,所以2017-03-10的rolling_average为17。 2017-03-15的rolling_average值为15,因为它在2017-03-15的平均值为13,而2017-03-10的平均值为17,这与前7天的窗口一致。 2017-03-18的平均移动平均数为19,因为2017-03-18平均数为25,2017-03-10平均数为13,与前7天的数据一致,2017年不包括17 -03-10,因为这与前7天窗口不一致。

有没有办法做到这一点,而不是每周窗口不重叠的装仓窗口?

回答

4

我想通了正确的方法使用该计算器计算移动/滚动平均值:

Spark Window Functions - rangeBetween dates

基本想法是将您的时间戳列转换为secon ds,然后可以使用pyspark.sql.Window类中的rangeBetween函数在窗口中包含正确的行。

下面是解决例如:

%pyspark 
from pyspark.sql import functions as F 

#function to calculate number of seconds from number of days 
days = lambda i: i * 86400 

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"), 
         (13, "2017-03-15T12:27:18+00:00"), 
         (25, "2017-03-18T11:27:18+00:00")], 
         ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 

#create window by casting timestamp to long (number of seconds) 
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)) 

df = df.withColumn('rolling_average', F.avg("dollars").over(w)) 

这导致移动平均数的确切列,我一直在寻找:

dollars timestampGMT   rolling_average 
17  2017-03-10 15:27:18.0 17.0 
13  2017-03-15 12:27:18.0 15.0 
25  2017-03-18 11:27:18.0 19.0 
1

你的意思是这样的:

df = spark.createDataFrame([(17, "2017-03-11T15:27:18+00:00"), 
          (13, "2017-03-11T12:27:18+00:00"), 
          (21, "2017-03-17T11:27:18+00:00")], 
          ["dollars", "timestampGMT"]) 
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) 
df = df.withColumn('rolling_average', f.avg("dollars").over(Window.partitionBy(f.window("timestampGMT", "7 days")))) 

输出:

+-------+-------------------+---------------+         
|dollars|timestampGMT  |rolling_average| 
+-------+-------------------+---------------+ 
|21  |2017-03-17 19:27:18|21.0   | 
|17  |2017-03-11 23:27:18|15.0   | 
|13  |2017-03-11 20:27:18|15.0   | 
+-------+-------------------+---------------+ 
+0

感谢张,更接近我想要的东西,但不正是我想要的。您的代码仍然通过日期分箱计算答案。我希望每周的平均水平在该行日期结束。没有做出一个好榜样是我的错。我将用一个更新的示例来编辑我的帖子,以显示我想要的内容。 –