2016-12-19 27 views
1

我期待得到pyspark两个RDD的交集。他们看起来像下面这样:Pyspark路口

rdd1 = sc.parallelize(["abc","def", "ghi"]) 
rdd2 = sc.parallelize([["abc","123"],["df",345], ["ghi","678"]) 

使用pyspark的RDD运营商获得是否有可能:

intersection_rdd --> ["abc","123"] ["ghi","678"] 

回答

2

通过PySpark RDD一个快速的方法是使用join但要注意,它需要两个RDDS是相同的尺寸。要做到这一点,我们将与您的例子开始下面

rdd1 = sc.parallelize([["abc"],["def"], ["ghi"]]) 
rdd2 = sc.parallelize([["abc", 123],["df", 345], ["ghi", 678]]) 

然后,您可以创建rdd1a所以它的尺寸相同rdd2

rdd1a = rdd1.map(lambda x: (x[0], 1)) 

然后你就可以运行join

rdd1a.join(rdd2).map(lambda x: (x[0], x[1][1])).collect() 
## Out[25]: [('abc', 123), ('ghi', 678)] 

注意,这可能不是大RDDS但其获得此出一个快速,快捷的方式将高性能的方法。

另一种方法是利用DataFrames按照以下:

df1 = rdd1.toDF(['col']) 
df2 = rdd2.toDF(['col', 'value']) 
df_intersect = df1.join(df2, df1.col == df2.col, 'inner').select(df1.col, df2.value) 
df_intersect.show() 

与输出是:

+---+-----+ 
|col|value| 
+---+-----+ 
|ghi| 678| 
|abc| 123| 
+---+-----+ 
0

你会尝试这种解决你的问题:

rdd1 = sc.parallelize([[x] for x in ["abc","def", "ghi"]]) 
rdd2 = sc.parallelize([["abc","123"],["df",345], ["ghi","678"]])  
df1 = rdd1.toDF(['key']) 
df2 = rdd2.toDF(['key', 'value']) 
intersect = df1.join(df2, 'key').orderBy('key') 
intersect.show() 

输出:

+---+-----+ 
|key|value| 
+---+-----+ 
|abc| 123| 
|ghi| 678| 
+---+-----+