2016-05-15 56 views
0

如果我有一个数据集与此类似:对RDD转换

val list = List ((1,1), (1,2), (1,3), (2,2), (2,1), (3,1), (3,3))

而且我想找到每个键的平均所以输出应该是:

(1, 2), (2, 3/2), (3, 2)我能做到这一点使用groupByKey, countByKey, and reduceByKey莫名其妙或我必须使用类似于下面的示例combineByKey方法:我尝试使用groupByKey, countByKey, and reduceByKey但这种方法的组合不起作用,我想知道是否有人知道使用这三种方法做到这一点?

val result = input.combineByKey(
(v) => (v, 1), 
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), 
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)). 
map{ case (key, value) => (key, value._1/value._2.toFloat) } 

result.collectAsMap().map(println(_)) 

回答

4

你应该尝试以下操作:

val sc: SparkContext = ... 
val input = sc.parallelize(List((1,1), (1,2), (1,3), (2,2), (2,1), (3,1), (3,3))) 
val averages = input.groupByKey.map { case (key, values) => 
    (key, values.sum/values.size.toDouble) 
} 

println(averages.collect().toList) // List((1,2.0), (2,1.5), (3,2.0)) 
+0

哦,对了,我忘当你groupByKey它把值在数组中,所以你可以使用之和大小在该阵列上!感谢您的帮助! – CapturedTree

1

那么你可以简单地使用PairRDDFunctions.groupByKey并计算你想要什么。

val avgKey = input.groupByKey.map{ 
    case (k, v) => (k, v.sum.toDouble/v.size) 
} 
avgkey.collect 
//res2: Array[(Int, Double)] = Array((3,2.0), (1,2.0), (2,1.5)) 
1

使用reduceByKey,与二倍体之前转化为三胞胎

rdd.map{ case(k,v) => (k,(v,1)) }. 
    reduceByKey((a,v) => (a._1+v._1, a._2+v._2)). 
    map {case (k,v) => (k, v._1/v._2)} 
+0

你好榆木!对不起,对于迟到的回复,但是你能解释一下当你在map函数中使用'case'吗?只是为了让你可以将参数写成'(k,v)',而不必具体指定'k'和'​​v'的结构。例如,如果'k'是一个元组,在地图中没有情况下,我将不得不写'((a1,a2),v)'?所以它在技术上只适用于模式匹配? – CapturedTree

+0

使用'case'我们*启用*模式匹配,提取或分解数据结构,我们使用大括号来定义一个部分函数(并非所有的模式都需要定义)。另一方面,元组数据结构的使用涉及使用它自己的方法(._1和._2)来获取(提取)数据项。 – elm

相关问题