2017-10-12 29 views
0

在PySpark,我有2 RDD的其结构为(键,名单列表):Pyspark:使用地图功能,而不是收集迭代RDDS

input_rdd.take(2) 
[(u'100', 
    [[u'36003165800', u'70309879', u'1']]), 
(u'200', 
    [[u'5196352600', u'194837393', u'99']]) ] 

output_rdd.take(2) 
[(u'100', 
    [[u'875000', u'5959', u'1']]), 
(u'300', [[u'16107000', u'12428', u'1']])] 

现在我想要一个结果RDD(如图所示下面),它将基于键的两个RDD分组,并按顺序给出输出为元组(键,(,))。当输入或输出中没有任何键时,那么该rdd的列表保持为空。

[(u'100', 
([[[u'36003165800', u'70309879', u'1']]], 
[[[u'875000', u'5959', u'1']]]), 
(u'200', 
([[[u'5196352600', u'194837393', u'99']]], 
    [])), 
(u'300',([],[[[u'16107000', u'12428', u'1']]]) 
] 

为了获得所得RDD我使用的下面的代码段使用

resultant=sc.parallelize(x, tuple(map(list, y))) for x,y in sorted(list(input_rdd.groupWith(output_rdd).collect())) 

是否与groupWith功能办法可以除去.collect(),而使用.MAP()在Pyspark中获得相同的结果RDD?

+0

给空RDD – raul

回答

0

完全外部联结给出:

input_rdd.fullOuterJoin(output_rdd).collect() 
# [(u'200', ([[u'5196352600', u'194837393', u'99']], None)), 
# (u'300', (None, [[u'16107000', u'12428', u'1']])), 
# (u'100', ([[u'36003165800', u'70309879', u'1']], [[u'875000', u'5959', u'1']]))] 

要更换None[]

input_rdd.fullOuterJoin(output_rdd).map(lambda x: (x[0], tuple(i if i is not None else [] for i in x[1]))).collect() 

# [(u'200', ([[u'5196352600', u'194837393', u'99']], [])), 
# (u'300', ([], [[u'16107000', u'12428', u'1']])), 
# (u'100', ([[u'36003165800', u'70309879', u'1']], [[u'875000', u'5959', u'1']]))]