2015-09-09 58 views
3

我试图联合到已经在我们的集群上的键散列分区分布RDDS。我不需要保留任何顺序或甚至分区,我只是希望工会尽可能快。在这个例子中,我确实需要所有记录,而不仅仅是不同的记录,但保持多样性。Apache Spark RDD的union和zipPartition有什么区别?

这里是什么,我会简单地使用:

val newRDD = tempRDD1.union(tempRDD2) 

这里是别人推荐给我的印象是速度更快,因为它利用的RDDS如何已经划分和分配:

val newRDD = tempRDD1.zipPartitions(tempRDD2, preservesPartitioning=true)((iter, iter2) => iter++iter2) 

哪个更快?结果是否完全一致,会员方面?

我问这个,因为直到现在我还以为这些方法是等价的,但是当我提高了我的数据量和分区数量,执行程序,内存等等时,我得到了奇怪的zipPartitions方法结果,之后不能正确使用reduceByKey。或许我的差异是由于我的RDD本身,它们的形式是((String,String),(String,Long,Long,Long,Long)),所以也许iter ++ iter2除了工会之外还做了其他的事情那些价值?

是zipPartitions隐含做任何额外的东西,像一个比较排序,或重新散列东西,或一般不同实施合并比工会吗?

如果RDD包含非不同行,或多个密钥副本,或者存在空分区或密钥的散列冲突或任何其他此类问题,union-vs-zipPartitions会返回不同的结果吗?

是的,我可以运行测试自己(其实,我已经这样做了近2天了!),所以请不要发布任何愚蠢的事问我,如果我已经试过这样的,和这样的.. 。我正在问这个问题,以便更好地理解代码级下的情况。是否将“union”书写为“zipPartitions”的子项?

后来编辑:加入与toDebugString结果一些例子,所推荐的@Holden

val tempIntermediateRDD6 = tempIntermediateRDD1. 
    zipPartitions(tempIntermediateRDD2, true)((iter, iter2) => iter++iter2). 
    zipPartitions(tempIntermediateRDD5, true)((iter, iter2) => iter++iter2). 
    partitionBy(partitioner). 
    setName("tempIntermediateRDD6"). 
    persist(StorageLevel.MEMORY_AND_DISK_SER) 

tempIntermediateRDD6.checkpoint 

println(tempIntermediateRDD6.toDebugString) 

// (2568) tempIntermediateRDD6 ZippedPartitionsRDD2[169] at zipPartitions at mycode.scala:3203 [Disk Memory Serialized 1x Replicated] 
// | ZippedPartitionsRDD2[168] at zipPartitions at mycode.scala:3202 [Disk Memory Serialized 1x Replicated] 
// | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated] 
// |  CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B 
// | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated] 
// | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated] 
// | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated] 
// | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated] 
// |  CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B 
// | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated] 

与:

val tempIntermediateRDD6 = tempIntermediateRDD1. 
    union(tempIntermediateRDD2). 
    union(tempIntermediateRDD5). 
    partitionBy(partitioner). 
    setName("tempIntermediateRDD6"). 
    persist(StorageLevel.MEMORY_AND_DISK_SER) 

tempIntermediateRDD6.checkpoint 

println(tempIntermediateRDD6.toDebugString) 

// (2568) tempIntermediateRDD6 ShuffledRDD[170] at partitionBy at mycode.scala:3208 [Disk Memory Serialized 1x Replicated] 
// +-(5136) UnionRDD[169] at union at mycode.scala:3207 [Disk Memory Serialized 1x Replicated] 
//  | PartitionerAwareUnionRDD[168] at union at mycode.scala:3206 [Disk Memory Serialized 1x Replicated] 
//  | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated] 
//  |  CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B 
//  | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated] 
//  | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated] 
//  | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated] 
//  | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated] 
//  |  CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B 
//  | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated] 
+0

一个区别我已经看到的是,这些命令返回的RDD类型是不同的:工会()返回一个ShuffledRDD,而zipPartitions()返回一个ZippedPartitionsRDD2。我稍后在程序中注意到这些RDD类型的某些操作(例如reduceByKey)的差异,所以我想知道这些RDD类型有什么区别。 –

+0

另请参见https://issues.apache。org/jira/browse/SPARK-10493的一些额外的背景和讨论,为什么我问这个问题 –

回答

4

联盟返回一个专门UnionRDD,我们可以看到它是怎么写的在Spark项目中查看UnionRDD.scala。看着它,我们可以看到,Union使用的代码块实际执行:

override def getPartitions: Array[Partition] = { 
    val array = new Array[Partition](rdds.map(_.partitions.length).sum) 
    var pos = 0 
    for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { 
     array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) 
     pos += 1 
    } 
    array 
    } 

如果你好奇,什么底层的计算看起来像一个RDD我推荐使用上造成的RDDS功能toDebugString你可以看到依赖DAG的样子。

+0

谢谢,这是部分有用的。我已经使用过toDebugString了,但是对于我的RDD来说,它并没有吐出看到幕后发生的事情所需的详细程度。我将在上面编辑我的问题,并发布一个带有结果的示例.DebugStrings –

+0

Yah toDebugString并不总是足够的,但是您可以经常使用那个+来查看所引用的类,以了解发生了什么。 – Holden

相关问题