2016-04-26 35 views
1

我有一个非常长(几十亿行)和宽得多(几百列)的RDD。我想创建每列中唯一值的集合(这些集合不需要并行化,因为每列中包含的值不会超过500个)。在PySpark中为RDD中的每列找到不同的值

这是我到目前为止有:

data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]]) 
num_columns = len(data.first()) 
empty_sets = [set() for index in xrange(num_columns)] 
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y))) 

我在做什么这里正试图initate的空集列表,一个在我的RDD每一列。对于聚合的第一部分,我想通过data逐行进行迭代,将列n中的值添加到集列表中的第n集。如果这个值已经存在,它就不会执行任何操作。然后,它在之后执行集合的union,以便在所有分区中只返回不同的值。

当我尝试运行这段代码,我得到以下错误:

AttributeError: 'list' object has no attribute 'add'

我认为问题是,我无法准确,清楚,我通过集列表迭代( empty_sets),并且我遍历data中每行的列。我相信在(lambda a, b: a.add(b))aempty_setsbdata.first()(整行,不是一个单一的值)。这显然不起作用,并不是我想要的聚合。

如何遍历我的列表集合,并通过我的数据框的每一行,将每个值添加到其对应的set对象?

所需的输出将如下所示:

[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]


PS我看了这个例子here,这是非常相似,我的使用情况下(这是在那里我得到了主意,用首先是aggregate)。但是,我发现代码很难转换成PySpark,我很不清楚代码正在做什么casezip

回答

1

有两个问题。一,你的组合函数假设每一行都是一个集合,但是你正在一组集合上进行操作。两个,add不返回任何东西(尝试a = set(); b = a.add('1'); print b),所以你的第一个组合函数返回一个列表None s。要解决这个问题,要使你的第一个组合器函数不是匿名的,并让它们都循环遍历集合列表:

def set_plus_row(sets, row): 
    for i in range(len(sets)): 
     sets[i].add(row[i]) 
    return sets 


unique_values_per_column = data.aggregate(
    empty_sets, 
    set_plus_row, # can't be lambda b/c add doesn't return anything 
    lambda x, y: [a.union(b) for a, b in zip(x, y)] 
) 

我不知道什么zip确实在Scala中,但在Python,它需要两个列表,并把每个对应元素一起放入元组(试行x = [1, 2, 3]; y = ['a', 'b', 'c']; print zip(x, y);),这样你可以遍历同时两个列表。