我试图联合到已经在我们的集群上的键散列分区分布RDDS。我不需要保留任何顺序或甚至分区,我只是希望工会尽可能快。在这个例子中,我确实需要所有记录,而不仅仅是不同的记录,但保持多样性。Apache Spark RDD的union和zipPartition有什么区别?
这里是什么,我会简单地使用:
val newRDD = tempRDD1.union(tempRDD2)
这里是别人推荐给我的印象是速度更快,因为它利用的RDDS如何已经划分和分配:
val newRDD = tempRDD1.zipPartitions(tempRDD2, preservesPartitioning=true)((iter, iter2) => iter++iter2)
哪个更快?结果是否完全一致,会员方面?
我问这个,因为直到现在我还以为这些方法是等价的,但是当我提高了我的数据量和分区数量,执行程序,内存等等时,我得到了奇怪的zipPartitions方法结果,之后不能正确使用reduceByKey。或许我的差异是由于我的RDD本身,它们的形式是((String,String),(String,Long,Long,Long,Long)),所以也许iter ++ iter2除了工会之外还做了其他的事情那些价值?
是zipPartitions隐含做任何额外的东西,像一个比较排序,或重新散列东西,或一般不同实施合并比工会吗?
如果RDD包含非不同行,或多个密钥副本,或者存在空分区或密钥的散列冲突或任何其他此类问题,union-vs-zipPartitions会返回不同的结果吗?
是的,我可以运行测试自己(其实,我已经这样做了近2天了!),所以请不要发布任何愚蠢的事问我,如果我已经试过这样的,和这样的.. 。我正在问这个问题,以便更好地理解代码级下的情况。是否将“union”书写为“zipPartitions”的子项?
后来编辑:加入与toDebugString结果一些例子,所推荐的@Holden
val tempIntermediateRDD6 = tempIntermediateRDD1.
zipPartitions(tempIntermediateRDD2, true)((iter, iter2) => iter++iter2).
zipPartitions(tempIntermediateRDD5, true)((iter, iter2) => iter++iter2).
partitionBy(partitioner).
setName("tempIntermediateRDD6").
persist(StorageLevel.MEMORY_AND_DISK_SER)
tempIntermediateRDD6.checkpoint
println(tempIntermediateRDD6.toDebugString)
// (2568) tempIntermediateRDD6 ZippedPartitionsRDD2[169] at zipPartitions at mycode.scala:3203 [Disk Memory Serialized 1x Replicated]
// | ZippedPartitionsRDD2[168] at zipPartitions at mycode.scala:3202 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated]
// | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated]
与:
val tempIntermediateRDD6 = tempIntermediateRDD1.
union(tempIntermediateRDD2).
union(tempIntermediateRDD5).
partitionBy(partitioner).
setName("tempIntermediateRDD6").
persist(StorageLevel.MEMORY_AND_DISK_SER)
tempIntermediateRDD6.checkpoint
println(tempIntermediateRDD6.toDebugString)
// (2568) tempIntermediateRDD6 ShuffledRDD[170] at partitionBy at mycode.scala:3208 [Disk Memory Serialized 1x Replicated]
// +-(5136) UnionRDD[169] at union at mycode.scala:3207 [Disk Memory Serialized 1x Replicated]
// | PartitionerAwareUnionRDD[168] at union at mycode.scala:3206 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated]
// | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated]
一个区别我已经看到的是,这些命令返回的RDD类型是不同的:工会()返回一个ShuffledRDD,而zipPartitions()返回一个ZippedPartitionsRDD2。我稍后在程序中注意到这些RDD类型的某些操作(例如reduceByKey)的差异,所以我想知道这些RDD类型有什么区别。 –
另请参见https://issues.apache。org/jira/browse/SPARK-10493的一些额外的背景和讨论,为什么我问这个问题 –