2016-08-19 114 views
0

我有rdd类型RDD[(String, String)],我想输出RDD[(String, String, Int)]其中int将被计数的类似集合。例如:Apache Spark - 如何计算配对RDD中的相似键/值对

输入RDD:

java, perl 
.Net, php 
java, perl 

输出RDD:

java, perl, 2 
.Net, php, 1 

我尝试添加IntInput RDD(如1),所以,现在Input RDD变为:

[(String, String, Int)]其中Int为1.

但是t.reduceByKey((a,b,c) => (a,b,c))发生错误。

回答

1

您可以通过连接两个值来创建一个新的密钥,然后添加如下显示:

lines = sc.parallelize(["java, perl", ".Net, php", "java, perl"]) 
splitted = lines.map(lambda l: l.split(",")) 
processed = splitted.map(lambda l: (l[0] + "," + l[1], 1)) 
reduced = processed.reduceByKey(lambda a, b: a+b) 

或者干脆把整条线路为“关键”:

lines = sc.parallelize(["java, perl", ".Net, php", "java, perl"]) 
processed = lines.map(lambda l: (l, 1)) 
reduced = processed.reduceByKey(lambda a, b: a + b) 

输出:

>>> lines.collect() 
['java, perl', '.Net, php', 'java, perl'] 
>>> reduced.collect() 
[('.Net, php', 1), ('java, perl', 2)] 

编辑:

您可以定义一个函数来格式化数据,并使用map转型:

def formatter(line): 
    skills = line[0].split() 
    return skills[0], skills[1], line[1] 

threecols = reduced.map(formatter) 
+0

感谢Orions队的答复。其实我需要输出显示在数据框中。如果我将使用df()函数将输出转换为数据框,则会生成两列,但我需要三列输出。第1列skill1(java),第2列skill2(perl),第3列count(int)。 –

相关问题