2016-03-10 56 views
0

后我有一个RDD P映射到类:火花:TOPN的GroupBy

case class MyRating(userId:Int, itemId:Int, rating:Double) 

我对寻找为每个用户即的GroupBy 用户id并且每个形成的组内TOPN条目,滤出TopN(例如10)条目基于最高评分

我做了以下内容:

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey 
val B : RDD[((Int), List[MyRating])] = key.mapValues(iter => iter.toList.sortBy(_.rating, false)) 
val C = values.groupByKey.take(10) 

显然应用。取(10)groupByKey离开后,只有10个键(用户),我并不会筛选出每个用户的TOP10评级。

我们如何去应用.take(N)后面的groupBy,以便它作用于某个部分的值而不是key本身?

回答

3

天真的方法是采取ñ值:

B.mapValues(_.take(n)) 

但如果你需要的值只有小部分会更好地使用例如aggregateByKey和运行,而不是分组丢弃过时的记录一切。你可能希望要的东西在实践中更有效的(你可以检查top/takeOrdered星火实现),但你可以像这样开始:

import scala.math.Ordering 
import scala.collection.mutable.PriorityQueue 

implicit val ord = Ordering.by[MyRating, Double](_.rating) 

val pairs = rdd.keyBy(_.userId) 
pairs.aggregateByKey(new scala.collection.mutable.PriorityQueue[MyRating]())(
    (acc, x) => { 
    acc.enqueue(x) 
    acc.take(n) 
    }, 
    (acc1, acc2) => (acc1 ++ acc2).take(n) 
) 

注意,上面的代码需要斯卡拉2.11+由于SI-7568

+0

谢谢,aggregateByKey更有意义..看起来像一个强大的构造。 – srbhkmr

2

如果我理解正确的话,你需要做的是: 组RDD通过用户ID,然后为每个(ID,列表)元组给予回复的ID和排序,并修剪成10个元素的列表

P 
    .groupBy(_.userId) 
    .map{ case (key, it) => 
    (key, it.toList.sortBy(mr => -mr.rating).take(10)) 
    } 
1

您非常接近,但您需要在A到B的映射范围内取前N个条目。例如,如果您想从列表中取得前2个MyRating项目,则下面的代码将执行招。 B将是一个RDD,其中包含每个userId的前两名MyRating的列表。 (此外,sortBy函数将简单地通过使评级为负)工作。

case class MyRating(userId:Int, itemId:Int, rating:Double) 

val plist:List[MyRating] = List(MyRating(1,0,1),MyRating(1,1,5),MyRating(1,2,7),MyRating(1,3,9),MyRating(1,4,10),MyRating(2,5,1),MyRating(2,6,5),MyRating(2,6,7)) 
val P: org.apache.spark.rdd.RDD[MyRating] = sc.parallelize(plist) 

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey 
val TOPCOUNT = 2 
val B : RDD[((Int), List[MyRating])] = A.mapValues(iter => iter.toList.sortBy(- _.rating).take(TOPCOUNT)) 
1

下面是使用aggregateByKey通过zero323的建议为例:

val A : RDD[(Int, MyRating)] = P.keyBy(r => r.userId) 
val B = A.aggregateByKey(List[MyRating]())(
    (l, r) => (l :+ r).sortBy(-_.rating).take(10), 
    (l1, l2) => (l1 ++ l2).sortBy(-_.rating).take(10)) 

使用这种方法的好处是,你没有可能打乱你的遗嘱执行人之间的大量数据。如果来自单个用户的评级分布在多个节点上,则groupBy需要将用户的所有评级发送给同一个执行者,而使用aggregateByKey首先在每个执行者上建立前N列表,然后只有那些列表被混洗并结合起来。

这是否对您有益取决于数据的分布。如果你的收视率比最终收视率还要高,那么你并没有获得太多的收视率(尤其是对于每个单独的收视率进行排序的我的天真实施)。但是,如果每个执行者的评分数量大一个数量级,您就可以赢得很多。

+0

感谢您解释这两种方法之间的差异。非常感激。 – srbhkmr