2016-08-05 25 views
6

我想要RDD表现的动作,如reduce,但不需要操作符可交换。即我希望result将始终是"123456789"RDD中是否有任何操作保持顺序?

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24 

scala> val result = rdd.someAction{ _+_ } 

首先,我找到了fold。的RDD#fold的医生说:

DEF折叠(零值:T)(OP:(T,T)⇒T):T骨料 每个分区的所有分区中的元件,并且然后将结果,使用一个 给联想功能和中性“零值”

注意,没有可交换在doc需要。但是,预期的结果并不:

scala> rdd.fold(""){ _+_ } 
res10: String = 312456879 

编辑我曾尝试通过@ DK14提到,没有运气:

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res22: String = 341276895 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res23: String = 914856273 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res24: String = 742539618 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res25: String = 271468359 
+0

您错过了文档的下一部分,它描述了您所看到的内容:*“这与在Scala等函数语言中为非分布式集合实现的折叠操作有些不同,这种折叠操作可能适用于然后将这些结果折叠到最终结果中,而不是按照某些定义的顺序将折叠应用于每个元素。对于不可交换的函数,结果可能与应用于非分布式集合的折叠结果不同。“* –

回答

2

没有内置降低满足在斯卡拉这一标准动作,但你可以很容易地通过结合mapPartitionscollect和地方减少实现自己的:

import scala.reflect.ClassTag 

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = { 
    rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f) 
} 

使用的collectreduce组合用于合并,而不是由所使用fold异步和无序方法确保全局顺序被保留。

这当然带有包括一些额外的成本:对驾驶员

  • 略高的内存占用。
  • 显着更高的延迟 - 我们明确地等待所有任务完成,然后再开始本地减少。
+0

感谢您的帮助,这是否意味着每个分区**始终是整个RDD的连续子序列**?有没有提到的文件? – Eastsun

+0

关于文档 - 我没有意识到。尽管如此,它或多或少受到某些有序方法的模型和契约的约束。 Spark中真正的问题是如何确定整个序列。一般情况下,有两种情况是当您对订单进行推理时a)使用显式排序(按合同)b)当您有输入时生成确定性有序分割并且在输入和当前点之间没有混洗和其他数据移动。 – zero323

1

正如指出的@YuvalItzchakov fold不保留排序在组合结果时分区为RDD。为了说明这一点考虑合并原RDD一个唯一的分区,

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1) 
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res4: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res5: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res6: String = 123456789 
+0

应该指出的是,这样做会遭受完全失去计算并行能力的缺点。 –

+0

@YuvalItzchakov确定;使用'fold',排序可能不会保存在分区的'RDD'中。 – elm

+0

是的,我明白了。但是OP应该意识到这一点。 –

相关问题