我有一个非常长(几十亿行)和宽得多(几百列)的RDD。我想创建每列中唯一值的集合(这些集合不需要并行化,因为每列中包含的值不会超过500个)。在PySpark中为RDD中的每列找到不同的值
这是我到目前为止有:
data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]])
num_columns = len(data.first())
empty_sets = [set() for index in xrange(num_columns)]
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y)))
我在做什么这里正试图initate的空集列表,一个在我的RDD每一列。对于聚合的第一部分,我想通过data
逐行进行迭代,将列n
中的值添加到集列表中的第n
集。如果这个值已经存在,它就不会执行任何操作。然后,它在之后执行集合的union
,以便在所有分区中只返回不同的值。
当我尝试运行这段代码,我得到以下错误:
AttributeError: 'list' object has no attribute 'add'
我认为问题是,我无法准确,清楚,我通过集列表迭代( empty_sets
),并且我遍历data
中每行的列。我相信在(lambda a, b: a.add(b))
那a
是empty_sets
和b
是data.first()
(整行,不是一个单一的值)。这显然不起作用,并不是我想要的聚合。
如何遍历我的列表集合,并通过我的数据框的每一行,将每个值添加到其对应的set对象?
所需的输出将如下所示:
[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]
PS我看了这个例子here,这是非常相似,我的使用情况下(这是在那里我得到了主意,用首先是aggregate
)。但是,我发现代码很难转换成PySpark,我很不清楚代码正在做什么case
和zip
。