我的目标是处理一系列通过调用org.apache.spark.rdd.RDD[_].saveAsObjectFile(...)
产生SequenceFile
文件夹。我的文件夹结构与此类似:范围界定问题的foreach斯卡拉
\MyRootDirectory
\Batch0001
_SUCCESS
part-00000
part-00001
...
part-nnnnn
\Batch0002
_SUCCESS
part-00000
part-00001
...
part-nnnnn
...
\Batchnnnn
_SUCCESS
part-00000
part-00001
...
part-nnnnn
我需要提取一些持久的数据,但是我的收藏 - 我是否使用ListBuffer
,mutable.Map
,或任何其他可变类型,失去范围,似乎是newed上来就sequenceFile(...).foreach
每次迭代概念的以下证明产生了一系列的“处理目录......”接着是“1:1”的反复,从不增加,如我所料counter
和intList.size
做。
private def proofOfConcept(rootDirectoryName: String) = {
val intList = ListBuffer[Int]()
var counter: Int = 0
val config = new SparkConf().setAppName("local").setMaster("local[1]")
new File(rootDirectoryName).listFiles().map(_.toString).foreach { folderName =>
println(s"Processing directory $folderName...")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
sc.sequenceFile(folderName, classOf[NullWritable], classOf[BytesWritable]).foreach(f => {
counter += 1
intList += counter
println(s" $counter : ${intList.size}")
})
sc.stop()
}
}
输出:
"C:\Program Files\Java\jdk1.8.0_111\bin\java" ...
Processing directory C:\MyRootDirectory\Batch0001...
17/05/24 09:30:25.228 WARN [main] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:> (0 + 0)/57] 1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
Processing directory C:\MyRootDirectory\Batch0002...
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
Processing directory C:\MyRootDirectory\Batch0003...
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
1 : 1
https://spark.apache.org/docs/latest/programming-guide.html#understanding-closures – zero323
当Spark不在图片中时,您是否看到这个? –