我的原始数据以表格格式显示。它包含来自不同变量的观测值。每个观察都带有变量名称,时间戳和当时的值。PySpark:检索数据帧内组的平均值和数值的计数
变量[字符串],时间[日期时间],值[浮子]
的数据被存储作为镶木在HDFS并装入火花数据帧(DF)。从那个数据帧。
现在我想计算每个变量的默认统计量,如平均值,标准偏差等。之后,一旦Mean被检索出来,我想过滤/计算那些非常接近Mean的变量值。
因此我需要首先得到每个变量的均值。这就是为什么我使用GroupBy来获取每个变量的统计信息(而不是整个数据集)。
df_stats = df.groupBy(df.Variable).agg(\
count(df.Variable).alias("count"), \
mean(df.Value).alias("mean"), \
stddev(df.Value).alias("std_deviation"))
使用每个变量的均值我可以过滤那些特定变量的均值附近的值(只是计数)。因此我需要该变量的所有观察值(值)。这些值位于原始数据帧df中,而不在聚合/分组数据帧df_stats中。
最后我想一个数据帧像聚合/分组df_stats一个新列“count_around_mean”。
我想使用df_stats.map(...)或df_stats.join(df,df.Variable)。但是我卡上的红色箭头:(
提问:你会如何意识到
临时解决方案:同时我使用的是基于您的想法的解决方案但对于STDDEV范围2和3的范围,功能无法正常工作。它总是产生一个
AttributeError的话说零式无_jvm
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
w1 = Window().partitionBy("Variable")
w2 = Window.partitionBy("Variable").orderBy("Time")
def stddev_pop_w(col, w):
#Built-in stddev doesn't support windowing
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))
def isInRange(value, mean, stddev, radius):
try:
if (abs(value - mean) < radius * stddev):
return 1
else:
return 0
except AttributeError:
return -1
delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long")
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType())
f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType())
f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType())
df \
.withColumn("mean", mean("Value").over(w1)) \
.withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \
.withColumn("delta", delta)
.withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \
.withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \
.show(5, False)
#df2.withColumn("std_dev_3", stddev_range(col("Value"), w1)) \
至少对我而言,这并不清楚你在这里真正想要什么。你是什么意思_完整的时间系列_?你想要应用什么样的功能? – zero323
为了更好地理解,我重新编写了原始问题。同时我找到了一个很好的方法,我明天需要测试。然后我会在这里上传这个想法。 – Matthias