我期待得到pyspark
两个RDD的交集。他们看起来像下面这样:Pyspark路口
rdd1 = sc.parallelize(["abc","def", "ghi"])
rdd2 = sc.parallelize([["abc","123"],["df",345], ["ghi","678"])
使用pyspark的RDD运营商获得是否有可能:
intersection_rdd --> ["abc","123"] ["ghi","678"]
我期待得到pyspark
两个RDD的交集。他们看起来像下面这样:Pyspark路口
rdd1 = sc.parallelize(["abc","def", "ghi"])
rdd2 = sc.parallelize([["abc","123"],["df",345], ["ghi","678"])
使用pyspark的RDD运营商获得是否有可能:
intersection_rdd --> ["abc","123"] ["ghi","678"]
通过PySpark RDD一个快速的方法是使用join
但要注意,它需要两个RDDS是相同的尺寸。要做到这一点,我们将与您的例子开始下面
rdd1 = sc.parallelize([["abc"],["def"], ["ghi"]])
rdd2 = sc.parallelize([["abc", 123],["df", 345], ["ghi", 678]])
然后,您可以创建rdd1a
所以它的尺寸相同rdd2
。
rdd1a = rdd1.map(lambda x: (x[0], 1))
然后你就可以运行join
:
rdd1a.join(rdd2).map(lambda x: (x[0], x[1][1])).collect()
## Out[25]: [('abc', 123), ('ghi', 678)]
注意,这可能不是大RDDS但其获得此出一个快速,快捷的方式将高性能的方法。
另一种方法是利用DataFrames
按照以下:
df1 = rdd1.toDF(['col'])
df2 = rdd2.toDF(['col', 'value'])
df_intersect = df1.join(df2, df1.col == df2.col, 'inner').select(df1.col, df2.value)
df_intersect.show()
与输出是:
+---+-----+
|col|value|
+---+-----+
|ghi| 678|
|abc| 123|
+---+-----+
你会尝试这种解决你的问题:
rdd1 = sc.parallelize([[x] for x in ["abc","def", "ghi"]])
rdd2 = sc.parallelize([["abc","123"],["df",345], ["ghi","678"]])
df1 = rdd1.toDF(['key'])
df2 = rdd2.toDF(['key', 'value'])
intersect = df1.join(df2, 'key').orderBy('key')
intersect.show()
输出:
+---+-----+
|key|value|
+---+-----+
|abc| 123|
|ghi| 678|
+---+-----+