2014-01-17 102 views
81

RDD'smapmapPartitions方法有什么区别? flatMap的行为如同map还是像mapPartitions?谢谢。Apache Spark:map vs mapPartitions?

(编辑) 即有什么区别(无论是语义或执行方面)

def map[A, B](rdd: RDD[A], fn: (A => B)) 
       (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { 
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) }, 
     preservesPartitioning = true) 
    } 

之间:

def map[A, B](rdd: RDD[A], fn: (A => B)) 
       (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { 
    rdd.map(fn) 
    } 
+2

阅读下面的后回答,你可以看看[经验]由实际使用它的人共享(https://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/) https:// bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/ – Abhidemon

回答

78

什么是RDD的地图和mapPartitions之间的区别方法?

方法map源RDD的每个元件转换成通过应用函数RDD结果的单个元素。 mapPartitions将源RDD的每个分区转换为结果的多个元素(可能没有)。

并且flatMap的行为像map还是像mapPartitions?

都不是,flatMap作品的单个元件上(如map)和产生的结果的多个元件(如mapPartitions)。

+1

谢谢 - 也是如此MAP原因洗牌(或以其他方式改变分区的数量)?它是否在节点之间移动数据?我一直在使用mapPartitions来避免在节点之间移动数据,但不确定flapMap是否会这样做。 –

+0

如果你看看源 - https://github.com/apache/incubator-spark/blob/97ac06018206b593600594605be241d0cd706e08/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala和https:/ /github.com/apache/incubator-spark/blob/97ac06018206b593600594605be241d0cd706e08/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala - 既'map'和'flatMap'具有完全相同的分区,同父母。 –

+11

作为一个说明,通过扬声器在2013旧金山星火峰会(goo.gl/JZXDCR)提供的演示重点介绍了单位记录开销任务执行与mapPartition比与地图转化更好。据介绍,这是由于设立新任务的成本高昂。 –

48

Imp。提示:

只要你有重量级的初始化应该对许多RDD元素,而不是每个RDD元素进行一次 完成,如果这 初始化,如创建从第三方 库对象,不能被序列化(以便Spark可以通过集群上的 将其传输到工作节点),使用mapPartitions()而不是 map()mapPartitions()规定初始化完成 每个工作者任务/线程/分区而不是一次RDD数据 元素为example :见下文。

val newRd = myRdd.mapPartitions(partition => { 
    val connection = new DbConnection /*creates a db connection per partition*/ 

    val newPartition = partition.map(record => { 
    readMatchingFromDB(record, connection) 
    }).toList // consumes the iterator, thus calls readMatchingFromDB 

    connection.close() // close dbconnection here 
    newPartition.iterator // create a new iterator 
}) 

Q2。 确实flatMap表现如同地图或像mapPartitions

是的。请参阅flatmap的示例2 ..其自我解释。

Q1。什么之间的RDD的mapmapPartitions

map的作品被使用在每个单元级的功能,而 mapPartitions行使在分区级别功能的差异。

示例方案如果我们有在特定RDD分区100K元件那么我们将会触发关闭功能正在使用的映射变换100K时候,我们使用map。相反,如果我们使用mapPartitions,那么我们将只调用一次特定函数,但是我们将传递所有100K记录并在一次函数调用中取回所有响应。

自从map在特定函数上工作很多次以来,性能会有所提高,特别是如果函数每次都会花费很多代价,如果我们一次传入所有元素,案例mappartitions)。

地图

适用于RDD的每个项目转换函数,并返回 结果作为新RDD。

清单异体

DEF映射[U:ClassTag](F:T => U):RDD [U]

实施例:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) 
val b = a.map(_.length) 
val c = a.zip(b) 
c.collect 
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

这是一个专门的地图,被称为每个分区只有一次。 通过输入参数(Iterarator [T]),各个分区的全部内容可作为 顺序值的数据流提供。 自定义函数必须返回另一个Iterator [U]。结合的 结果迭代器会自动转换为新的RDD。请注意,由于我们选择了分区,因此下面的 结果中缺少元组(3,4)和(6,7)。

preservesPartitioning指示输入功能是否保留 分割器,这应该是false除非这是一双RDD和输入 函数不修改的键。

清单异体

DEF mapPartitions [U:ClassTag](F:迭代[T] =>迭代[U], preservesPartitioning:布尔=假):RDD [U]

实施例1

val a = sc.parallelize(1 to 9, 3) 
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { 
    var res = List[(T, T)]() 
    var pre = iter.next 
    while (iter.hasNext) 
    { 
    val cur = iter.next; 
    res .::= (pre, cur) 
    pre = cur; 
    } 
    res.iterator 
} 
a.mapPartitions(myfunc).collect 
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

实施例2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) 
def myfunc(iter: Iterator[Int]) : Iterator[Int] = { 
    var res = List[Int]() 
    while (iter.hasNext) { 
    val cur = iter.next; 
    res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) 
    } 
    res.iterator 
} 
x.mapPartitions(myfunc).collect 
// some of the number are not outputted at all. This is because the random number generated for it is zero. 
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

上述程序还可以使用flatMap如下写入。使用flatmap

val x = sc.parallelize(1 to 10, 3) 
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect 

res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

结论

例2:

mapPartitions转型比map更快,因为它要求你的函数次/分,没有一次/元..

+2

我知道你可以使用'map'或'mapPartitions'来实现相同的结果(参见问题中的两个例子);这个问题是关于你为什么选择一种方式。其他答案中的评论非常有用!此外,你没有提到'map'和'flatMap'将'false'传递给'preservesPartitioning',以及它的含义是什么。 –

+1

函数每次执行与函数执行一次parition是我失踪的链接。一次使用mapPartition访问多个数据记录是非常宝贵的事情。欣赏答案 –

+0

有没有'map'比'mapPartitions'更好的场景?如果'mapPartitions'非常好,为什么它不是默认的地图实现? – ruhong

4

地图

  1. 它处理在一次一行,非常类似于映射()的MapReduce的方法。
  2. 您从每一行后的转换中返回。

MapPartitions

  1. 它处理完整区块一气呵成。
  2. 您可以处理整个分区后从函数返回只有一次。
  3. 所有的中间结果需要在内存中保存,直到你处理整个分区。
  4. 提供您喜欢的设置()图()和清理()的MapReduce功能

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Maphttp://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

+0

- 如果您正在执行迭代器到迭代器的转换,而没有将迭代器实现为某种类型的集合,那么您将不必将整个分区保存在内存中,实际上,这种方式可以实现spark将部分分区泄露到磁盘。 – ilcord

+0

你不必在内存中保存整个分区,但结果。直到处理完整个分区后才能返回结果 – KrazyGautam