2015-04-23 46 views
3

我有以下数据,我想要做的是PySpark reduceByKey?添加键/元组

[(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')] 

针对每一个琴键计值的情况下(1串字符)。所以,我首先做了一个地图:

.map(lambda x: (x[0], [x[1], 1])) 

使现在的关键/元组:

[(13, ['D', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['T', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['T', 1]), (53, ['2', 1]), (54, ['0', 1]), (13, ['A', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['A', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['X', 1])] 

我只是不能在最后一部分搞清楚那封信如何为每个键计数的情况下, 。例如键13将有1 d和1个A.虽然14将有2度T的等

+1

你想第一个'groupByKey',然后在已分组的角色执行的计数。 – ohruunuruus

回答

3

我更熟悉斯卡拉星火,所以有可能会比Counter更好的方法来计算由groupByKey产生的迭代器角色,但这里的一个选项:

from collections import Counter 

rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.groupByKey().mapValues(lambda x: Counter(x)).collect() 

[(48, Counter({'0': 2})), 
(32, Counter({'6': 2})), 
(49, Counter({'2': 2})), 
(50, Counter({'0': 2})), 
(51, Counter({'X': 1, 'T': 1})), 
(53, Counter({'2': 1})), 
(13, Counter({'A': 1, 'D': 1})), 
(45, Counter({'A': 1, 'T': 1})), 
(14, Counter({'T': 2})), 
(54, Counter({'0': 1})), 
(47, Counter({'2': 2}))] 
+2

哦,你已经使用了计数器!不幸的是,应该避免使用'groupByKey',因为它汇集了master上的所有数据,而2个操作而不是1个操作都不够用,但是1个代表紧凑! – ipoteka

+0

@ipoteka有趣的是我没有了解“groupByKey”的低效率您是否有详细阐述这个问题的良好参考? – ohruunuruus

+3

http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/ best_practices/prefer_reducebykey_over_groupbykey.html – ipoteka

3

如果我理解你的权利,你可以在一个操作combineByKey做到这一点:

from collections import Counter 
x = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
result = x.combineByKey(lambda value: {value: 1}, 
...      lambda x, value: value.get(x,0) + 1, 
...      lambda x, y: dict(Counter(x) + Counter(y))) 
result.collect() 
[(32, {'6': 2}), (48, {'0': 2}), (49, {'2': 2}), (53, {'2': 1}), (13, {'A': 1, 'D': 1}), (45, {'A': 1, 'T': 1}), (50, {'0': 2}), (54, {'0': 1}), (14, {'T': 2}), (51, {'X': 1, 'T': 1}), (47, {'2': 2})] 
+0

看起来像这个解决方案13有('A',2)而不是[('A',1),('D',1)] – ohruunuruus

+0

嗯,我假设13只对应于'A',我会改变我的答案。谢谢! – ipoteka

+0

OP需要为每个键上的每个字符计数 – ohruunuruus

2

相反的:

.map(lambda x: (x[0], [x[1], 1])) 

我们可以这样做:

.map(lambda x: ((x[0], x[1]), 1)) 

而在最后一步,我们可以使用reduceByKey添加。请注意,添加来自运营商包。

将其组合在一起:

from operator import add 
rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).collect()