2016-01-04 44 views
2

我正在寻找一种方法来智能地比较RDD的子集。比较RDD的子集

可以说我有一个类型为(Int-> T)的键/值对的RDD。我最终需要说“将键1的所有值与键2的所有值进行比较,并将键3的值与键5和键7的值进行比较”,我将如何有效地做到这一点?

目前我想这样做的方法是通过创建过滤RDDS的列表,然后使用RDD.cartesian()

def filterSubset[T] = (b:Int, r:RDD[(Int, T)]) => r.filter{case(name, _) => name == b} 

Val keyPairs:(Int, Int) // all key pairs 

Val rddPairs = keyPairs.map{ 

      case (a, b) => 

       filterSubset(a,r).cartesian(filterSubset(b,r)) 

     } 

rddPairs.map{whatever I want to compare…} 

然后,我会遍历列表,并在每个执行地图对的RDD收集我需要的关系数据。

我不能告诉这个想法的是,设置数百个地图作业的可能性是否极其低效,然后遍历它们。在这种情况下,spark中的懒惰估值会优化所有映射之间的数据混洗吗?如果不是,有人可以推荐一种可能更有效的方法来解决这个问题吗?

谢谢您的帮助,您可以解决这个问题

+1

你能更好地解释你的比较逻辑是什么?你想比较哪些键? –

+1

当然,如果我有一个RDD [(Int,Vector)],那么一个很好的例子就是,我只想计算带有相关键的向量的余弦相似性(这些键基本上划分了不同的向量组)。 –

+1

每个密钥有多少数据?它是一对一的映射还是您想与多个子集进行比较(如{1,{2,3,5,7})? – zero323

回答

3

一种方式是复制和分区数据,以反映要比较密钥对。让我们开始创建从实际键两张地图的临时密钥我们将使用复制和联接:

def genMap(keys: Seq[Int]) = keys 
    .zipWithIndex.groupBy(_._1) 
    .map{case (k, vs) => (k -> vs.map(_._2))} 

val left = genMap(keyPairs.map(_._1)) 
val right = genMap(keyPairs.map(_._2)) 

接下来,我们可以通过新的密钥复制转换数据:

def mapAndReplicate[T: ClassTag](rdd: RDD[(Int, T)], map: Map[Int, Seq[Int]]) = { 
    rdd.flatMap{case (k, v) => map.getOrElse(k, Seq()).map(x => (x, (k, v)))} 
} 

val leftRDD = mapAndReplicate(rddPairs, left) 
val rightRDD = mapAndReplicate(rddPairs, right) 

最后我们可以协同组:

val cogrouped = leftRDD.cogroup(rightRDD) 

而且比较/过滤器对:

cogrouped.values.flatMap{case (xs, ys) => for { 
    (kx, vx) <- xs 
    (ky, vy) <- ys 
    if cosineSimilarity(vx, vy) <= threshold 
} yield ((kx, vx), (ky, vy)) } 

显然在目前的形式这种方法是有限的。它假设任意一对密钥的值可以放入内存并需要大量的网络通信量。不过它应该给你一些想法如何继续。

另一种可能的方法是将数据存储在外部系统(例如数据库)中并按需获取所需的键值对。

既然你试图找到元素之间的相似性,我也会考虑完全不同的方法。我会尝试使用反映文档之间预期相似性的自定义分区程序对数据进行分区,而不是单纯地比较按键。一般来说这不是微不足道的,而应该给出更好的结果。

+0

谢谢你zero323。这给了我很多潜在的方向来参与这个项目(我特别喜欢使用定制分区的想法) –