Imp。提示:
只要你有重量级的初始化应该对许多RDD
元素,而不是每个RDD
元素进行一次 完成,如果这 初始化,如创建从第三方 库对象,不能被序列化(以便Spark可以通过集群上的 将其传输到工作节点),使用mapPartitions()
而不是 map()
。 mapPartitions()
规定初始化完成 每个工作者任务/线程/分区而不是一次RDD
数据 元素为example :见下文。
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
Q2。 确实flatMap
表现如同地图或像mapPartitions
?
是的。请参阅flatmap
的示例2 ..其自我解释。
Q1。什么之间的RDD的map
和mapPartitions
map
的作品被使用在每个单元级的功能,而 mapPartitions
行使在分区级别功能的差异。
示例方案:如果我们有在特定RDD
分区100K元件那么我们将会触发关闭功能正在使用的映射变换100K时候,我们使用map
。相反,如果我们使用mapPartitions
,那么我们将只调用一次特定函数,但是我们将传递所有100K记录并在一次函数调用中取回所有响应。
自从map
在特定函数上工作很多次以来,性能会有所提高,特别是如果函数每次都会花费很多代价,如果我们一次传入所有元素,案例mappartitions
)。
地图
适用于RDD的每个项目转换函数,并返回 结果作为新RDD。
清单异体
DEF映射[U:ClassTag](F:T => U):RDD [U]
实施例:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
这是一个专门的地图,被称为每个分区只有一次。 通过输入参数(Iterarator [T]),各个分区的全部内容可作为 顺序值的数据流提供。 自定义函数必须返回另一个Iterator [U]。结合的 结果迭代器会自动转换为新的RDD。请注意,由于我们选择了分区,因此下面的 结果中缺少元组(3,4)和(6,7)。
preservesPartitioning
指示输入功能是否保留 分割器,这应该是false
除非这是一双RDD和输入 函数不修改的键。
清单异体
DEF mapPartitions [U:ClassTag](F:迭代[T] =>迭代[U], preservesPartitioning:布尔=假):RDD [U]
实施例1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
实施例2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
上述程序还可以使用flatMap如下写入。使用flatmap
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
结论
例2:
mapPartitions
转型比map
更快,因为它要求你的函数次/分,没有一次/元..
阅读下面的后回答,你可以看看[经验]由实际使用它的人共享(https://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/) https:// bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/ – Abhidemon