2016-10-18 48 views
0

我正在寻找一种方法将RDD分成两个或多个RDD,并将获得的结果保存为两个分离的RDD。鉴于为例:如何将RDD拆分为两个RDD并使用PySpark将结果保存为RDD?

rdd_test = sc.parallelize(range(50), 1) 

我的代码:

def split_population_into_parts(rdd_test): 

    N = 2 
    repartionned_rdd = rdd_test.repartition(N).distinct() 
    rdds_for_testab_populations = repartionned_rdd.glom() 

    return rdds_for_testab_populations 

rdds_for_testab_populations = split_population_into_parts(rdd_test) 

其中给出:

[[0, 2, 4, 6, 8, 10, 12 , 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48], [1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49]]

现在我想在这里新的关联艾利列表RDD。例如RDD1和RDD2。该怎么办 ?谢谢 !

回答

0

我得到了解决方案。

def get_testab_populations_tables(rdds_for_testab_populations): 
i = 0 
while i < len(rdds_for_testab_populations.collect()): 
    for testab_table in rdds_for_testab_populations.toLocalIterator(): 
     namespace = globals() 
     namespace['tAB_%d' % i] = sc.parallelize(testab_table) 
     i += 1 

return; 

然后,你可以这样做:

print tAB_0.collect() 
print tAB_1.collect() 
etc.