我有一个36个元素的RDD。我有一个3个节点的集群,每个节点有4个核心。我已经将RDD重新分区为36个部分,以便每个分区可能有一个要处理的元素,但是整个36个元素都是分区的,这样只有4个部分各有9个元素,其余部分都是空的,因此没有任何可处理的内容和服务器资源没有得到充分利用。如何确保RDD的每个分区都有一些数据
如何重新分区数据以确保每个部分都有一些要处理的数据?我怎样才能确保每个部件都有3个要素进行处理?
我有一个36个元素的RDD。我有一个3个节点的集群,每个节点有4个核心。我已经将RDD重新分区为36个部分,以便每个分区可能有一个要处理的元素,但是整个36个元素都是分区的,这样只有4个部分各有9个元素,其余部分都是空的,因此没有任何可处理的内容和服务器资源没有得到充分利用。如何确保RDD的每个分区都有一些数据
如何重新分区数据以确保每个部分都有一些要处理的数据?我怎样才能确保每个部件都有3个要素进行处理?
通过定义,repartition(numPartitions)
改组在RDD数据随机产生更多或更少的分区并在它们之间进行平衡,它总是在网络上慢腾腾的所有数据。
Apache Spark给出的保证是均匀分布的,但这不会为每个分区产生完全相同数量的元素。 (也即数据集的大小是非常小的!)
您可以考虑使用HashPartitioner
:
scala> val rdd = sc.parallelize(for { x <- 1 to 36 } yield (x, None), 8)
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[31] at parallelize at <console>:27
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> def countByPartition(rdd: RDD[(Int, None.type)]) = rdd.mapPartitions(iter => Iterator(iter.length))
countByPartition: (rdd: org.apache.spark.rdd.RDD[(Int, None.type)])org.apache.spark.rdd.RDD[Int]
scala> countByPartition(rdd).collect
res25: Array[Int] = Array(4, 5, 4, 5, 4, 5, 4, 5)
scala> countByPartition(rdd.partitionBy(new HashPartitioner(12))).collect
res26: Array[Int] = Array(3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3)
我已经借了从zero323的答案的例子和助手约How does HashPartitioner work?
我希望这有助于!
编辑:
如果你会做以下几点:
scala> val rdd = sc.parallelize(for { x <- 1 to 36 } yield (x, None), 12)
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[36] at parallelize at <console>:29
scala> countByPartition(rdd).collect
res28: Array[Int] = Array(4, 5, 4, 5, 4, 5, 4, 5)
结果不会是一定相同。
感谢您的答复。如果我在第一行本身完成了val rdd = sc.parallelize(对于{x < - 1至36} yield(x,None),12),我是否会得到与countByPartition相同的结果(rdd.partitionBy(new HashPartitioner (12)))。collect –
不可以。你必然会得到相同的结果。 – eliasah
你使用'coalesce'还是'repartition'?我想这也可能是因为你的元素很少。 – philantrovert
我正在使用重新分区。是的,我的元素太少,在这种情况下只有36个。但是每个元素都有很多处理要做。我希望每个分区都有一些数据,而不是不均匀的重新分区 –
@philantrovert是否有解决此问题的方法,因为我拥有数百万条记录,但有些分区根本没有接收数据,有些分区的数据可能多达5个 –