2015-04-07 41 views
3

我有对的RDD如下:拆分RDD成RDD的没有重复的值

(105,918) 
(105,757) 
(502,516) 
(105,137) 
(516,816) 
(350,502) 

我想将其分成两个RDD的,使得第一仅具有非重复值对(对于键和值),第二个将具有其余的省略对。

所以从上面我们可以得到两个RDD的

1) (105,918) 
    (502,516) 

2) (105,757) - Omitted as 105 is already included in 1st RDD 
    (105,137) - Omitted as 105 is already included in 1st RDD 
    (516,816) - Omitted as 516 is already included in 1st RDD 
    (350,502) - Omitted as 502 is already included in 1st RDD 

目前我使用的是可变的变量设置跟踪合并原RDD到单个分区后,已经选择的元素:

val evalCombinations = collection.mutable.Set.empty[String] 
val currentValidCombinations = allCombinations 
    .filter(p => { 
    if(!evalCombinations.contains(p._1) && !evalCombinations.contains(p._2)) { 
    evalCombinations += p._1;evalCombinations += p._2; true 
    } else 
    false 
}) 

此方法受操作运行所在的执行程序的内存限制。是否有更好的可扩展解决方案?

+0

我对问题描述和例子感到困惑:描述说'将它分成两个RDD,使得第一个只有非重复值的对(对于键和值)“。然而,这个例子表明,由此产生的RDD包含105个,在样本中重复3次,502-2次,516-2次。事实上,在描述之后,你的例子中产生的RDD应该是空的。谨慎澄清? – maasg

+0

@maasg在我的阅读中,这意味着RDD需要分成两部分。第一个RDD应该包含唯一的键和值。任何重复的键/值将进入第二个RDD。所以在这个例子中,第一个RDD有((105,918),(502,516))。在这组关键值中,没有重复。 –

回答

2

这是一个版本,这将需要更多的驱动程序内存。

import org.apache.spark.rdd._ 
import org.apache.spark._ 

def getUniq(rdd: RDD[(Int, Int)], sc: SparkContext): RDD[(Int, Int)] = { 

    val keys = rdd.keys.distinct 
    val values = rdd.values.distinct 

    // these are the keys which appear in value part also. 
    val both = keys.intersection(values) 

    val bBoth = sc.broadcast(both.collect.toSet) 

    // remove those key-value pairs which have value which is also a key. 
    val uKeys = rdd.filter(x => !bBoth.value.contains(x._2)) 
       .reduceByKey{ case (v1, v2) => v1 } // keep uniq keys 

    uKeys.map{ case (k, v) => (v, k) }    // swap key, value 
     .reduceByKey{ case (v1, v2) => v1 }  // keep uniq value 
     .map{ case (k, v) => (v, k) }    // correct placement 

} 

def getPartitionedRDDs(rdd: RDD[(Int, Int)], sc: SparkContext) = { 

    val r = getUniq(rdd, sc)  
    val remaining = rdd subtract r 
    val set = r.flatMap { case (k, v) => Array(k, v) }.collect.toSet 
    val a = remaining.filter{ case (x, y) => !set.contains(x) && 
              !set.contains(y) } 
    val b = getUniq(a, sc) 
    val part1 = r union b 
    val part2 = rdd subtract part1 
    (part1, part2) 
} 

val rdd = sc.parallelize(Array((105,918),(105,757),(502,516), 
           (105,137),(516,816),(350,502))) 

val (first, second) = getPartitionedRDDs(rdd, sc) 
// first.collect: ((516,816), (105,918), (350,502)) 
// second.collect: ((105,137), (502,516), (105,757)) 

val rdd1 = sc.parallelize(Array((839,841),(842,843),(840,843), 
           (839,840),(1,2),(1,3),(4,3))) 

val (f, s) = getPartitionedRDDs(rdd1, sc) 
//f.collect: ((839,841), (1,2), (840,843), (4,3)) 
+0

谢谢Shyamendra。我今天能够测试这个。在这种情况下,使用驱动程序内存似乎比执行程序内存更好。不幸的是,它似乎没有与以下数据一起工作:'val rdd1 = sc.parallelize(Array((839,841),(842,843),(840,843),(839,840)))' –

+0

是的,它没有检查为了重复值,已经更新了答案。 –

+0

谢谢Shyamendra。 –