2016-08-19 40 views
2

我有一个RDD这样的:如何分组并加入火花?

{"key1" : "fruit" , "key2" : "US" , "key3" : "1" } 

{"key1" : "fruit" , "key2" : "US" , "key3" : "2" } 

{"key1" : "vegetable" , "key2" : "US" , "key3" : "1" } 

{"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" } 

{"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" } 

我的目标是 第一组由key1的,然后按键2 最后加KEY3

我期待最终结果等,

key1   key2  key3 
"fruit"  , "US" , 3 
"vegetable" , "US" , 1 
"fruit"  , "Japan" , 3 
"vegetable" , "Japan" , 3 

我的代码开始如下,

rdd_arm = rdd_arm.map(lambda x: x[1]) 

rdd_arm包括上述键:值的格式。

我不知道下一步该去哪里。 有人能帮助我吗?

回答

1

让我们创建您的RDD:

In [1]: rdd_arm = sc.parallelize([{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "US" , "key3" : "2" }, {"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }, {"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }]) 
In [2]: rdd_arm.collect() 
Out[2]: 
[{'key1': 'fruit', 'key2': 'US', 'key3': '1'}, 
{'key1': 'fruit', 'key2': 'US', 'key3': '2'}, 
{'key1': 'vegetable', 'key2': 'US', 'key3': '1'}, 
{'key1': 'fruit', 'key2': 'Japan', 'key3': '3'}, 
{'key1': 'vegetable', 'key2': 'Japan', 'key3': '3'}] 

首先,你必须创建一个新的密钥,这将是对key1key2。它的价值将是key3,所以你想要做这样的事情:

In [3]: new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])) 

In [4]: new_rdd.collect() 
Out[4]: 
[('fruit, US', '1'), 
('fruit, US', '2'), 
('vegetable, US', '1'), 
('fruit, Japan', '3'), 
('vegetable, Japan', '3')] 

然后,我们想补充一点,是重复键的值,仅仅是调用reduceByKey(),像这样:

In [5]: new_rdd = new_rdd.reduceByKey(lambda a, b: int(a) + int(b)) 

In [6]: new_rdd.collect() 
Out[6]: 
[('fruit, US', 3), 
('fruit, Japan', '3'), 
('vegetable, US', '1'), 
('vegetable, Japan', '3')] 

我们完成了!


当然,这可能是一个内胆,像这样:

new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])).reduceByKey(lambda a, b: int(a) + int(b)) 
+1

你好gsamaras。感谢您的跟进。 –

2

我自己解决了。

我不得不创建一个包含多个密钥的密钥,然后加起来。

rdd_arm.map(lambda x : x[0] + ", " + x[1] , x[2]).reduceByKey(lambda a,b : a + b) 

下面的问题是有用的。

How to group by multiple keys in spark?

+0

请允许我说,这并没有为我工作,我得到未定义的名称的错误,并得到后越过他们,我无法做到这一点。因此我发布了一个新的答案,希望你喜欢它!尽管我提出了这个问题,因为它让我练习了!谢谢! – gsamaras