2017-08-21 23 views
1

我有一个36个元素的RDD。我有一个3个节点的集群,每个节点有4个核心。我已经将RDD重新分区为36个部分,以便每个分区可能有一个要处理的元素,但是整个36个元素都是分区的,这样只有4个部分各有9个元素,其余部分都是空的,因此没有任何可处理的内容和服务器资源没有得到充分利用。如何确保RDD的每个分区都有一些数据

如何重新分区数据以确保每个部分都有一些要处理的数据?我怎样才能确保每个部件都有3个要素进行处理?

+0

你使用'coalesce'还是'repartition'?我想这也可能是因为你的元素很少。 – philantrovert

+0

我正在使用重新分区。是的,我的元素太少,在这种情况下只有36个。但是每个元素都有很多处理要做。我希望每个分区都有一些数据,而不是不均匀的重新分区 –

+0

@philantrovert是否有解决此问题的方法,因为我拥有数百万条记录,但有些分区根本没有接收数据,有些分区的数据可能多达5个 –

回答

5

通过定义repartition(numPartitions)改组在RDD数据随机产生更多或更少的分区并在它们之间进行平衡,它总是在网络上慢腾腾的所有数据。

Apache Spark给出的保证是均匀分布的,但这不会为每个分区产生完全相同数量的元素。 (也即数据集的大小是非常小的!)

您可以考虑使用HashPartitioner

scala> val rdd = sc.parallelize(for { x <- 1 to 36 } yield (x, None), 8) 
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[31] at parallelize at <console>:27 

scala> import org.apache.spark.rdd.RDD 
import org.apache.spark.rdd.RDD 

scala> import org.apache.spark.HashPartitioner 
import org.apache.spark.HashPartitioner 

scala> def countByPartition(rdd: RDD[(Int, None.type)]) = rdd.mapPartitions(iter => Iterator(iter.length)) 
countByPartition: (rdd: org.apache.spark.rdd.RDD[(Int, None.type)])org.apache.spark.rdd.RDD[Int] 

scala> countByPartition(rdd).collect 
res25: Array[Int] = Array(4, 5, 4, 5, 4, 5, 4, 5) 

scala> countByPartition(rdd.partitionBy(new HashPartitioner(12))).collect 
res26: Array[Int] = Array(3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3) 

我已经借了从zero323的答案的例子和助手约How does HashPartitioner work?

我希望这有助于!

编辑:

如果你会做以下几点:

scala> val rdd = sc.parallelize(for { x <- 1 to 36 } yield (x, None), 12) 
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[36] at parallelize at <console>:29 

scala> countByPartition(rdd).collect 
res28: Array[Int] = Array(4, 5, 4, 5, 4, 5, 4, 5) 

结果不会是一定相同。

+0

感谢您的答复。如果我在第一行本身完成了val rdd = sc.parallelize(对于{x < - 1至36} yield(x,None),12),我是否会得到与countByPartition相同的结果(rdd.partitionBy(new HashPartitioner (12)))。collect –

+0

不可以。你必然会得到相同的结果。 – eliasah

相关问题