2016-03-07 29 views
0

假设之间有2个RDDSpyspark我怎么加两个RDDS用相同的密钥匹配

其中RDD1 has (key1,key2,value)

RDD2 has (key1, value)

现在我想结合的操作(如+或减)从RDD2到RDD1集key1的地方有一个比赛 这里例如

RDD1 has [1,1,3],[1,2,2],[2,2,5] 

RDD2 = sc.parallelize([1,1]) 

我想导致

RDD3 to [1,1,4],[1,2,3],[2,2,5] only the first and second data was added while third one wasn't 

我尝试使用左外连接到找到key1的比赛,并做一些操作,但我会失去那些不需要做手术,有没有办法做到在部分数据操作中的数据?

+0

你能澄清一下左外连接的问题吗? –

回答

1

假设你想配对操作,或者你的数据包含1至0..1关系中,你可以做最简单的事情是双方RDDS转换为DataFrames

from pyspark.sql.functions import coalesce, lit 

df1 = sc.parallelize([ 
    (1, 1, 3), (1, 2, 2), (2, 2, 5) 
]).toDF(("key1", "key2", "value")) 

df2 = sc.parallelize([(1, 1)]).toDF(("key1", "value")) 

new_value = (
    df1["value"] + # Old value 
    coalesce(df2["value"], lit(0)) # If no match (NULL) take 0 
).alias("value") # Set alias 

df1.join(df2, ["key1"], "leftouter").select("key1", "key2", new_value) 

您可以轻松地调整这种通过处理其他场景在加入DataFrames之前在df2上应用聚合。

相关问题