2017-10-05 49 views
1

我正尝试在现有的一组列上使用groupby聚合在Pyspark中创建一个列的新列。一个示例的输入数据帧被提供如下:collect_list通过保留基于另一个变量的顺序

------------------------ 
id | date  | value 
------------------------ 
1 |2014-01-03 | 10 
1 |2014-01-04 | 5 
1 |2014-01-05 | 15 
1 |2014-01-06 | 20 
2 |2014-02-10 | 100 
2 |2014-03-11 | 500 
2 |2014-04-15 | 1500 

预期的输出是:

id | value_list 
------------------------ 
1 | [10, 5, 15, 20] 
2 | [100, 500, 1500] 

列表中的值是由日期排序。

我尝试使用collect_list如下:

from pyspark.sql import functions as F 
ordered_df = input_df.orderBy(['id','date'],ascending = True) 
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value")) 

但collect_list不保证顺序,即使我聚集之前日期排序输入数据帧。

有人可以帮助如何通过保留基于第二个(日期)变量的订单来进行聚合吗?

回答

6

如果您以列表形式收集日期和值,则可以使用和udf根据日期对结果列进行排序,然后仅保留结果中的值。对于具体的例子

import operator 
import pyspark.sql.functions as F 

# create list column 
grouped_df = input_df.groupby("id") \ 
       .agg(F.collect_list(F.struct("date", "value")) \ 
       .alias("list_col")) 

# define udf 
def sorter(l): 
    res = sorted(l, key=operator.itemgetter(0)) 
    return [item[1] for item in res] 

sort_udf = F.udf(sorter) 

# test 
grouped_df.select("id", sort_udf("list_col") \ 
    .alias("sorted_list")) \ 
    .show(truncate = False) 
+---+----------------+ 
|id |sorted_list  | 
+---+----------------+ 
|1 |[10, 5, 15, 20] | 
|2 |[100, 500, 1500]| 
+---+----------------+ 
+0

谢谢...我只是试图在几百万更大的数据和我收到完全相同的顺序为collect_list的...有没有办法来解释为什么这可能正在发生?此外,检查collect_list似乎只是混淆了在一个日期内具有多个值的情况...是否意味着collect_list还维护了订单? – Ravi

+1

在你的代码中,你需要在collect_list()之前排序整个数据集。但是这不是必须的,在收集列表中的日期和值之后对结果列表进行排序会更有效。 – mtoto

+0

只是澄清...排序列和使用collect_list排序列将保存顺序? – Ravi

相关问题