让我们先从一些数据,根据你的描述:
from pyspark.sql.functions import unix_timestamp
df = sc.parallelize([("2017-01-01 00:00:01", 2.0, 2.0, 2.0),
("2017-01-01 00:00:08", 9.0, 9.0, 9.0),
("2017-01-01 00:00:02", 3.0, 3.0, 3.0),
("2017-01-01 00:00:03", 4.0, 4.0, 4.0),
("2017-01-01 00:00:04", 5.0, 5.0, 5.0),
("2017-01-01 00:00:05", 6.0, 6.0, 6.0),
("2017-01-01 00:00:06", 7.0, 7.0, 7.0),
("2017-01-01 00:00:07", 8.0, 8.0, 8.0)]).toDF(["time","a","b","c"])
df = df.withColumn("time", unix_timestamp("time", "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
>星火2.0
我们可以使用ceil()
函数生成一个新的interval
柱子中,然后我们就可以组您的数据并将所有其他变量收集到一个平面列表中。
,以保证正确的排序产生的名单内,不论最初的订单,我们将使用Window
功能,通过date
分区数据,创建一个rank
列由time
有序。
from pyspark.sql.window import Window
from pyspark.sql.functions import ceil
df = df.withColumn("date", df["time"].cast("date")) \
.withColumn("interval", ((ceil(df["time"].cast("long")/3L))*3.0).cast("timestamp"))
window = Window.partitionBy(df['date']).orderBy(df['time'])
,因为我们将收集rank
柱转化为正确的排序嵌套表,我们将定义一个udf
最终解包嵌套列出了所有的值,但第一个,这是rank
:
def unnest(col):
l = [item[1:] for item in col]
res = [item for sublist in l for item in sublist]
return(res)
unnest_udf = udf(unnest)
现在我们把一切融合在一起:
from pyspark.sql.functions import rank
from pyspark.sql.functions import collect_list, array
df.select('*', rank().over(window).alias('rank')) \
.groupBy("interval") \
.agg(collect_list(array("rank","a", "b","c")).alias("vals")) \
.withColumn("vals", unnest_udf("vals")) \
.sort("interval") \
.show(truncate = False)
+---------------------+---------------------------------------------+
|interval |vals |
+---------------------+---------------------------------------------+
|2017-01-01 00:00:03.0|[2.0, 2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0]|
|2017-01-01 00:00:06.0|[5.0, 5.0, 5.0, 6.0, 6.0, 6.0, 7.0, 7.0, 7.0]|
|2017-01-01 00:00:09.0|[8.0, 8.0, 8.0, 9.0, 9.0, 9.0] |
+---------------------+---------------------------------------------+
星火1.6
我们不能用array
作为内部collect_list()
参数,所以我们只是包装内array
的collect_list()
电话,而不是周围的其他方法。我们还会稍微修改我们的udf
,因为我们不会明确需要使用此方法的rank
列。
unpack_udf = udf(
lambda l: [item for sublist in l for item in sublist]
)
df.select('*', rank().over(window).alias('rank')) \
.groupBy("interval") \
.agg(array(collect_list("a"),
collect_list("b"),
collect_list("c")).alias("vals")) \
.withColumn("vals", unpack_udf("vals")) \
.sort("interval") \
.show(truncate = False)
+---------------------+---------------------------------------------+
|interval |vals |
+---------------------+---------------------------------------------+
|2017-01-01 00:00:03.0|[2.0, 3.0, 4.0, 2.0, 3.0, 4.0, 2.0, 3.0, 4.0]|
|2017-01-01 00:00:06.0|[5.0, 6.0, 7.0, 5.0, 6.0, 7.0, 5.0, 6.0, 7.0]|
|2017-01-01 00:00:09.0|[8.0, 9.0, 8.0, 9.0, 8.0, 9.0] |
+---------------------+---------------------------------------------+
注意vals
列现在下令以不同的方式,但一直由于我们前面定义的window
功能。
感谢您的回答,看起来不错,我会测试它。有一件事困扰我,是否有可能以这种方式失去功能,因为我之前没有对数据进行排序?例如,可以首先行是[A2,B1,C3,A1,B2,C2,A3,B3,C1],而第二个[B4,A4,C4,A5,C5,B5,A6,B6,C6],或类似的东西?特征的顺序非常重要,因为之后我将运行集群? – Marko
检查没有unpack_udf的结果,除非您另有指定,否则结果将始终为列的顺序。 – mtoto
我会这样做的,但是,我并不担心列的排序,而是原始行的顺序,如果你知道我的意思吗?由于数据不是按时间排序的,我们按照一天中的小时(窗口大小为60而不是3)对它进行分组,所以我如何确定结果行的列将具有相同的顺序,也就是说,所考虑的行相邻吗?是什么阻止a4行在a1和a3行之间而不是a2行? – Marko