2015-04-30 59 views
18

正如大家都知道Spark中的分区对任何“广泛”操作都有巨大的性能影响,所以它通常是在操作中定制的。我用下面的代码进行试验:在Apache Spark中,为什么RDD.union不保存分区程序?

val rdd1 = 
    sc.parallelize(1 to 50).keyBy(_ % 10) 
    .partitionBy(new HashPartitioner(10)) 
val rdd2 = 
    sc.parallelize(200 to 230).keyBy(_ % 13) 

val cogrouped = rdd1.cogroup(rdd2) 
println("cogrouped: " + cogrouped.partitioner) 

val unioned = rdd1.union(rdd2) 
println("union: " + unioned.partitioner) 

我看到,在默认情况下cogroup()总是产生一个RDD使用自定义分区,但union()不,它总是会恢复到默认值。这是违反直觉的,因为我们通常认为PairRDD应该使用它的第一个元素作为分区键。有没有办法“强制”Spark来合并2个PairRDD来使用相同的分区键?

回答

33

union是一个非常高效的操作,因为它不会移动任何数据。如果rdd1有10个分区,并且rdd2有20个分区,则rdd1.union(rdd2)将有30个分区:两个RDD的分区彼此接连。这只是一个簿记变化,没有洗牌。

但是必然会丢弃分区器。分区器是为给定数量的分区构建的。由此产生的RDD有许多分区,它们都不同于rdd1rdd2

参加工会后,您可以运行repartition来洗牌数据并按键组织它。


上面有一个例外。如果rdd1rdd2具有相同的分区(具有相同数量的分区),则union的行为不同。它将逐个加入两个RDD的分区,并为每个输入分配相同数量的分区。这可能涉及移动数据(如果分区不在同一地点),但不涉及洗牌。在这种情况下,分区程序被保留。 (代码为PartitionerAwareUnionRDD.scala。)

+4

实际上存在分区识别联合RDD,我认为它应该在分区可以保留的情况下自动使用;不知道为什么它不适用于此。请参阅https://github.com/apache/spark/blob/e0628f2fae7f99d096f9dd625876a60d11020d9b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala#L123和https://github.com/apache/spark /blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala –

+0

哇,很酷!从来不知道这一点。看起来只有当两个RDD具有相同的分区器时才使用它。我会将其添加到答案中,谢谢! –

+0

非常感谢!这是一个非常重要的优化。顺便说一句,如果这不是最佳的所有情况下,我总是可以写一个邮编+分区联盟 – tribbloid

相关问题