2017-03-27 47 views
0

我有2个rdd,其中一个作为词典列表,另一个作为元组列表,如下所示 -如何使用pyspark在字典列表中添加元组值的列表?

rdd1 = [{'id1',['string','string',count]},{'id2 ',['string','string',count]},{'id3',['string','string',count]}] rdd2 = [(id1,count),(id2,count),( id3,count)]

现在我想添加从rdd2到rdd1的计数,如果rdd2的id与rdd1匹配。 你能帮我实现吗?

预先感谢您。

回答

2

虽然盖茨的答案是正确的,但您应该尽量避免在使用RDD时使用for循环。在RDDS操作并行化,并要快得多相比,循环大datasets.You工作可以通过连接两个RDDS和重新格式化输出达到同样的当:

rdd1 = sc.parallelize([{'id1':['string','string',1]}, {'id2':['string','string',2]}, {'id3':['string','string',3]}]) 
rdd2 = sc.parallelize([('id1',2), ('id2',4), ('id3',6), ('id4',8)]) 
rdd_joined = rdd1.flatMap(lambda x:x.items()).join(rdd2) 
rdd_reformatted = rdd_joined.map(lambda (x,(y,z)):{x:y[:-1]+[y[-1]+z]}) 

rdd_reformatted.collect()给作为输出:

[{'id2': ['string', 'string', 6]}, 
{'id3': ['string', 'string', 9]}, 
{'id1': ['string', 'string', 3]}] 
+0

谢谢哈科。你很棒。从你的代码中学到了很多东西。 –

0

我希望这有助于。

rdd1 = [{'id1':['string','string',1]}, {'id2':['string','string',2]}, {'id3':['string','string',3]}] 
rdd2 = [('id1',2), ('id2',4), ('id3',6), ('id4',8)] 

for each in rdd2: 
    there = False 
    position = 0 
    for ele in rdd1: 
     if each[0] in ele.keys(): 
      #now increment the count 
      original = rdd1[position] 
      originalList = original[each[0]] 
      #updating the 3rd element 
      newList = originalList 
      newList[2] = originalList[2] + each[1] 
      #update the new list to key 
      updated = { each[0] : newList } 
      rdd1[position] = updated 
      there = True 
      break 
     position = position + 1 
print rdd1 
#output: [{'id1': ['string', 'string', 3]}, {'id2': ['string', 'string', 6]}, {'id3': ['string', 'string', 9]}] 
+0

非常感谢。它真的帮助了我。 –

+0

我很受欢迎。从雅科的回答中学到很多东西。 –