2016-01-17 75 views
0

我正在使用Spark API for Java。我有一个JavaPairRDD,其中的密钥k被压缩为一个字节序列。我想将解压缩函数(我已经写过)传递给KEY(而不是值)。这些密钥在解压缩后仍然是唯一的,我希望它们与它们的对应值配对,即将函数传递给JavaPairRDD中的KEY <K,V>

一种方法是对myHashMap = myPairRDD.collectAsMap()然后mySet = myHashMap.keySet(),但它不会再平行地完成,并且密钥将会脱离他们的价值观。

另一种方法是使用mySingleRDD = myPairRDD.keys()但随后键会从相应的值被分离,诉

有没有人有一个更好的办法?

回答

0

这里是伪代码。将旧的RDD转换为新的RDD,其中新的RDD的密钥是未压缩的。

newRDD = oldRdd.map((key, value) => (decompress(key), value)) 
1

RDDS支持两种类型的操作:转换,从现有的创建新的数据集,并行动,其上运行的数据集的计算后的值返回驱动程序。

对于所提出的问题,您应该使用mapToPair,这是一种转换,它通过(解压缩)函数传递每个JavaPairRDD元素并返回一个新的JavaPairRDD。
结果RDD上的每个键/值条目类型为Tuple2 <K, V>
在此,我用Tuple2<Object, Object>以键/值,也假设你有解压()功能键:

的Java 8:

JavaPairRDD<Object, Object> result = pairRDD.mapToPair( 
       (Tuple2<Object, Object> pair) -> new Tuple2<Object, Object>(uncompress(pair._1()), pair._2())); 

的Java 6/7: (不能避免非兰布达地狱......)

javaPair.mapToPair(new PairFunction<Tuple2<Object,Object>, Object, Object>() { 
    @Override 
    public Tuple2<Object, Object> call(Tuple2<Object, Object> pair) throws Exception { 
     return new Tuple2<Object, Object>(uncompress(pair._1()), pair._2()); 
    }});