2017-05-31 30 views
1

我有一个spark java程序,其中带有mapValues步骤的groupByKey已完成,并返回一个PairRDD,其值为所有输入rdd值的Iterable。 我已经读过,用mapValues代替groupByKey处的reduceByKey会带来性能上的提升,但我不知道如何在这里应用reduceByKey来解决我的问题。如何使用reduceByKey替换groupByKey以作为Spark java中的Iterable值返回?

具体而言,我有一个输入对RDD,其类型为Tuple5。在groupByKey和mapValues转换之后,我需要获得一个Key-Value对RDD,其中的值需要是输入值的可迭代。

JavaPairRDD<Long,Tuple5<...>> inputRDD; 
... 
... 
... 
JavaPairRDD<Long, Iterable<Tuple5<...>>> groupedRDD = inputRDD 
    .groupByKey() 
    .mapValues(
      new Function<Iterable<Tuple5<...>>,Iterable<Tuple5<...>>>() { 

       @Override 
       public Iterable<Tuple5<...>> call(
         Iterable<Tuple5<...>> v1) 
         throws Exception { 

        /* 
        Some steps here..        
        */ 

        return mappedValue; 
       } 
      }); 

有没有办法让我能得到使用reduceByKey上述转型?

+0

这里有些步骤是什么?你需要一个逻辑来减少它。 – philantrovert

+0

在'mapValues'函数中,我实际上是根据'Tuple5'中的一个键对每个值进行排序。我认为这里没有关系,这就是为什么我没有包含它们。 – Vishnu

+0

_我已经读过,用mapValues代替groupByKey中的reduceByKey会带来性能增益_ - 你看错了。 – zero323

回答

1

我一直在Spark上使用Scala,所以这不会是你可能更喜欢的确切答案。在groupByKey/mapValuesreduceByKey之间编码的主要区别可使用适于从该article一个简单的例子中可以看出:

val words = Array("one", "two", "two", "three", "three", "three") 
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) 

val wordCountsWithGroup = wordPairsRDD. 
    groupByKey. 
    mapValues(_.sum) 
wordCountsWithGroup.collect 
res1: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) 

val wordCountsWithReduce = wordPairsRDD. 
    reduceByKey(_ + _) 
wordCountsWithReduce.collect 
res2: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) 

在这个例子中,其中x => x.sum(即_.sum)在mapValues被使用,这将是(acc, x) => acc + x (即_ + _)在reduceByKey。功能签名有很大的不同。在mapValues中,您正在处理分组值的集合,而在reduceByKey中,您正在执行缩减。

+0

据我所知,为了得到一个RDD对的分组列表,我总是需要使用'groupKey',因为'reduceByKey'是用于像sum这样的聚合操作。所以在我的情况下,'reduceByKey'不是正确的吗? – Vishnu

+0

在重新阅读您的问题评论部分的描述之后,我会说'groupByKey'可能是要走的路,因为我不认为还原是一项正确的工具。 –

相关问题