2

这是来自here的后续问题。我正在尝试基于这个implementation实现k-means。它很好,,但我想取代groupByKey()reduceByKey(),但我不知道如何(我不担心现在的表现)。下面是相关缩小的代码:用reduceByKey替换groupByKey()

val data = sc.textFile("dense.txt").map(
     t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache() 

val read_mean_centroids = sc.textFile("centroids.txt").map(
     t => (t.split("#")(0), parseVector(t.split("#")(1)))) 
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2) 
do { 
    var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2)) 
    var pointsGroup = closest.groupByKey() // <-- THE VICTIM :) 
    var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap() 
    .. 

注意println(newCentroids)会给:

地图(23 - >(-6.269305E-4,-0.0011746404,-4.08004E-5),8 - >(-5.108732E-4,7.336348E-4,-3.707591E-4),17 - >(-0.0016383086,-0.0016974678,1.45 ..

println(closest)和:

MapPartitionsRDD [6] at map at kmeans.scala:75

相关问题:Using reduceByKey in Apache Spark (Scala)


一些documentation

DEF reduceByKey(FUNC:(V,V)⇒V):RDD [(K,V)]

合并的值用于每个键使用关联减少函数。

DEF reduceByKey(FUNC:(V,V)⇒V,numPartitions:智力):RDD [(K,V)]

合并使用的缔合减少函数中的每个键的值。

DEF reduceByKey(分割器:分区程序,FUNC:(V,V)⇒V):RDD [(K,V)]

使用关联减少功能合并用于每个键的值。

DEF groupByKey():RDD [(K,可迭代[V])]

组在RDD每个键成一个单一的序列值。

回答

5

你可以使用一个aggregateByKey()(略高于reduceByKey()更自然)这样来计算newCentroids

val newCentroids = closest.aggregateByKey((Vector.zeros(dim), 0L))(
    (agg, v) => (agg._1 += v, agg._2 + 1L), 
    (agg1, agg2) => (agg1._1 += agg2._1, agg1._2 + agg2._2) 
).mapValues(agg => agg._1/agg._2).collectAsMap 

对于这个工作,你需要计算你的数据,即dim的维度,但你只需要做一次。你可以使用类似val dim = data.first._2.length的东西。

+0

工作就像一个魅力!你能解释我们在这里做了什么吗?我的意思是为什么我想用reduceByKey()替换groupByKey()?这样做的主要优点是什么?相关:http://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work – gsamaras

+1

好吧,'groupByKey'会导致一堆东西被发送到各个节点之间即所有与给定键相关的值,用于所有键和数据的部分。另一方面,使用'aggregateByKey'方法,每个部分只负责向(向驾驶员)传送由总和和计数组成的对。这么少的网络通信以及无需创建所有这些值的集合(因为它只是它们的总和和数量在计算平均值时很重要)。 –

+0

好吧,这就是我的想法,非常感谢! – gsamaras