2017-05-24 44 views
1

我的目标是处理一系列通过调用org.apache.spark.rdd.RDD[_].saveAsObjectFile(...)产生SequenceFile文件夹。我的文件夹结构与此类似:范围界定问题的foreach斯卡拉

\MyRootDirectory 
    \Batch0001 
    _SUCCESS 
    part-00000 
    part-00001 
    ... 
    part-nnnnn 
    \Batch0002 
    _SUCCESS 
    part-00000 
    part-00001 
    ... 
    part-nnnnn 
    ... 
    \Batchnnnn 
    _SUCCESS 
    part-00000 
    part-00001 
    ... 
    part-nnnnn 

我需要提取一些持久的数据,但是我的收藏 - 我是否使用ListBuffermutable.Map,或任何其他可变类型,失去范围,似乎是newed上来就sequenceFile(...).foreach

每次迭代概念的以下证明产生了一系列的“处理目录......”接着是“1:1”的反复,从不增加,如我所料counterintList.size做。

private def proofOfConcept(rootDirectoryName: String) = { 
    val intList = ListBuffer[Int]() 
    var counter: Int = 0 
    val config = new SparkConf().setAppName("local").setMaster("local[1]") 
    new File(rootDirectoryName).listFiles().map(_.toString).foreach { folderName => 
     println(s"Processing directory $folderName...") 
     val sc = new SparkContext(config) 
     sc.setLogLevel("WARN") 
     sc.sequenceFile(folderName, classOf[NullWritable], classOf[BytesWritable]).foreach(f => { 
     counter += 1 
     intList += counter 
     println(s" $counter : ${intList.size}") 
     }) 
     sc.stop() 
    } 
    } 

输出:

"C:\Program Files\Java\jdk1.8.0_111\bin\java" ... 
Processing directory C:\MyRootDirectory\Batch0001... 
17/05/24 09:30:25.228 WARN [main] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
[Stage 0:>               (0 + 0)/57] 1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
Processing directory C:\MyRootDirectory\Batch0002... 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
Processing directory C:\MyRootDirectory\Batch0003... 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
    1 : 1 
+0

https://spark.apache.org/docs/latest/programming-guide.html#understanding-closures – zero323

+0

当Spark不在图片中时,您是否看到这个? –

回答

1

内部foreach功能是在火花工人JVM运行,而不是在客户端JVM,其中变量定义内。该工作人员在本地获取该变量的副本,将其增加并打印出来。我的猜测是你在本地测试这个吗?如果你是在生产,分配火花环境中运行这个,你甚至看不到那些打印输出。

更普遍,几乎所有的功能,您传递到可能会被远程实际执行的RDD的方法之一,将不会有任何局部变量或任何可变的访问。它会得到一个基本不变的快照。

如果你想将数据从火花的分布式存储移回客户端,使用RDD的collect方法。反向与sc.parallelize完成。但是请注意,这两种通常做得非常罕见,因为它们不并行发生。