2016-03-04 153 views
5

假设我创造这样的RDD(我用Pyspark):Spark如何决定如何分区RDD?

list_rdd = sc.parallelize(xrange(0, 20, 2), 6) 

然后我打印与glom()方法分区元素,并获得

[[0], [2, 4], [6, 8], [10], [12, 14], [16, 18]] 

是如何星火决定如何分割我的列表?元素的具体选择从哪里来?它可以以不同的方式耦合它们,留下除0和10之外的其他元素,以创建6个请求的分区。第二次运行时,分区是相同的。

使用较大的范围内,29元件,我得到分区中接着是三个元件2个元件的图案:

list_rdd = sc.parallelize(xrange(0, 30, 2), 6) 
[[0, 2], [4, 6, 8], [10, 12], [14, 16, 18], [20, 22], [24, 26, 28]] 

使用较小的范围9种元素的我得到

list_rdd = sc.parallelize(xrange(0, 10, 2), 6) 
[[], [0], [2], [4], [6], [8]] 

所以我推断Spark是通过将列表分割成一个配置来生成分区,其中最小的可能是后面跟着更大的集合,然后重复。

问题是,如果这个选择背后有一个原因,这是非常优雅的,但它也提供性能优势?

回答

2

除非您指定了特定的分区程序,否则这是“随机的”,因为它取决于该RDD的特定实现。在这种情况下,您可以前往ParallelCollectionsRDD进一步深入研究。

getPartitions被定义为:

val slices = ParallelCollectionRDD.slice(data, numSlices).toArray 
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray 

其中slice被注释掉的(重新格式化,以更好地适应):

/** 
* Slice a collection into numSlices sub-collections. 
* One extra thing we do here is to treat Range collections specially, 
* encoding the slices as other Ranges to minimize memory cost. 
* This makes it efficient to run Spark over RDDs representing large sets of numbers. 
* And if the collection is an inclusive Range, 
* we use inclusive range for the last slice. 
*/ 

注意,有一些注意事项与问候的记忆。所以,这又将是具体实施。

相关问题