2016-07-06 23 views
4

我的原始数据以表格格式显示。它包含来自不同变量的观测值。每个观察都带有变量名称,时间戳和当时的值。PySpark:检索数据帧内组的平均值和数值的计数

变量[字符串],时间[日期时间],值[浮子]

的数据被存储作为镶木在HDFS并装入火花数据帧(DF)。从那个数据帧。

现在我想计算每个变量的默认统计量,如平均值,标准偏差等。之后,一旦Mean被检索出来,我想过滤/计算那些非常接近Mean的变量值。

因此我需要首先得到每个变量的均值。这就是为什么我使用GroupBy来获取每个变量的统计信息(而不是整个数据集)。

df_stats = df.groupBy(df.Variable).agg(\ 
    count(df.Variable).alias("count"), \ 
    mean(df.Value).alias("mean"), \ 
    stddev(df.Value).alias("std_deviation")) 

使用每个变量的均值我可以过滤那些特定变量的均值附近的值(只是计数)。因此我需要该变量的所有观察值(值)。这些值位于原始数据帧df中,而不在聚合/分组数据帧df_stats中。

creating statistics

最后我想一个数据帧像聚合/分组df_stats一个新列“count_around_mean”

我想使用df_stats.map(...)或df_stats.join(df,df.Variable)。但是我卡上的红色箭头:(

提问:你会如何意识到

临时解决方案:同时我使用的是基于您的想法的解决方案但对于STDDEV范围2和3的范围,功能无法正常工作。它总是产生一个

AttributeError的话说零式无_jvm

from pyspark.sql.window import Window 
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

w1 = Window().partitionBy("Variable") 
w2 = Window.partitionBy("Variable").orderBy("Time") 

def stddev_pop_w(col, w): 
    #Built-in stddev doesn't support windowing 
    return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2)) 

def isInRange(value, mean, stddev, radius): 
    try: 
     if (abs(value - mean) < radius * stddev): 
      return 1 
     else: 
      return 0 
    except AttributeError: 
     return -1 

delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long") 
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType()) 
f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType()) 
f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType()) 

df \ 
    .withColumn("mean", mean("Value").over(w1)) \ 
    .withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \ 
    .withColumn("delta", delta) 
    .withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \ 
    .withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \ 
    .show(5, False) 

#df2.withColumn("std_dev_3", stddev_range(col("Value"), w1)) \ 
+0

至少对我而言,这并不清楚你在这里真正想要什么。你是什​​么意思_完整的时间系列_?你想要应用什么样的功能? – zero323

+0

为了更好地理解,我重新编写了原始问题。同时我找到了一个很好的方法,我明天需要测试。然后我会在这里上传这个想法。 – Matthias

回答

2

星火2.0+

你可以用一个取代stddev_pop_w内置pyspark.sql.functions.stddev*功能。

星火< 2.0

一般来说,没有必要与加盟聚集。相反,您可以计算统计数据,而无需使用窗口函数折叠行。假设你的数据是这样的:

import numpy as np 
import pandas as pd 
from pyspark.sql.functions import mean 

n = 10000 
k = 20 

np.random.seed(100) 

df = sqlContext.createDataFrame(pd.DataFrame({ 
    "id": np.arange(n), 
    "variable": np.random.choice(k, n), 
    "value": np.random.normal(0, 1, n) 
})) 

您可以通过variable定义与分区窗口:

from pyspark.sql.window import Window 

w = Window().partitionBy("variable") 

和计算统计如下:

from pyspark.sql.functions import avg, pow, sqrt 

def stddev_pop_w(col, w): 
    """Builtin stddev doesn't support windowing 
    You can easily implement sample variant as well 
    """ 
    return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2)) 


(df 
    .withColumn("stddev", stddev_pop_w(col("value"), w)) 
    .withColumn("mean", avg("value").over(w)) 
    .show(5, False)) 

## +---+--------------------+--------+------------------+--------------------+ 
## |id |value    |variable|stddev   |mean    | 
## +---+--------------------+--------+------------------+--------------------+ 
## |47 |0.77212446947439 |0  |1.0103781346123295|0.035316745261099715| 
## |60 |-0.931463439483327 |0  |1.0103781346123295|0.035316745261099715| 
## |86 |1.0199074337552294 |0  |1.0103781346123295|0.035316745261099715| 
## |121|-1.619408643898953 |0  |1.0103781346123295|0.035316745261099715| 
## |145|-0.16065930935765935|0  |1.0103781346123295|0.035316745261099715| 
## +---+--------------------+--------+------------------+--------------------+ 
## only showing top 5 rows 

就比较聚集体加入:

from pyspark.sql.functions import stddev, avg, broadcast 

df.join(
    broadcast(df.groupBy("variable").agg(avg("value"), stddev("value"))), 
    ["variable"] 
) 
+1

一如既往的好和聪明的答案。谢谢!明天我会试试。我也在考虑使用Window函数来获得两行之间的时间差(滞后),并且还指出q行是否属于均值附近的合理范围。 – Matthias

+0

我只是想知道如果调用柱/窗后两次是有效的?您可以使用groupBy/mapValues/StatCounter来一步获取这些值。结果表格被压缩(分组),但可以合并到原始表格中。 – Matthias

+0

您使用列调用的次数不会影响执行计划。如果你不喜欢窗口函数,你可能更喜欢聚集和加入。关于通过RDD,这意味着多重安全不安全的转换和繁重的IO + serde。 – zero323