我的任务是编写一个读取大文件(不适合内存)的代码将其逆转并输出最常用的五个单词。
我已经写下了下面的代码,它完成了这项工作。Spark代码优化
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object ReverseFile {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Reverse File")
conf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
val txtFile = "path/README_mid.md"
val txtData = sc.textFile(txtFile)
txtData.cache()
val tmp = txtData.map(l => l.reverse).zipWithIndex().map{ case(x,y) => (y,x)}.sortByKey(ascending = false).map{ case(u,v) => v}
tmp.coalesce(1,true).saveAsTextFile("path/out.md")
val txtOut = "path/out.md"
val txtOutData = sc.textFile(txtOut)
txtOutData.cache()
val wcData = txtOutData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(ascending = false)
wcData.collect().take(5).foreach(println)
}
}
的问题是,我是新来的火花和Scala,并且你可以在代码中看到我第一次读取文件扭转它保存然后读取它扭转和输出的五个最频繁出现的词汇。
- 有没有办法告诉火花保存TMP和工艺wcData(无需保存,打开文件)在同一时间,否则它像读取文件的两倍。
- 从现在开始,我要解决很多火花,所以如果有代码的任何部分(不像绝对路径名......特定的火花),你可能认为可以写得更好i'de欣赏它。
拿(5)收集,为什么? – eliasah
谢谢你的答案,但几个笔记:1 - 没有合并火花将文件保存为分区我想保存为一个,2-采取(5)之前collect()给编译错误(没有足够的参数收集),3 - 你能否给我提供一个关于所有这个地图的教程的链接(_ + _/_._ 2)不需要的东西。非常感谢 – Epsilon
1 - 好的。根据需要更改。 2 - 修正了收集。 3 - https://stackoverflow.com/questions/8000903/what-are-all-the-uses-of-an-underscore-in-scala#8000934 – Reactormonk