2015-11-07 127 views
1

我的任务是编写一个读取大文件(不适合内存)的代码将其逆转并输出最常用的五个单词。
我已经写下了下面的代码,它完成了这项工作。Spark代码优化

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object ReverseFile { 
    def main(args: Array[String]) { 


    val conf = new SparkConf().setAppName("Reverse File") 
    conf.set("spark.hadoop.validateOutputSpecs", "false") 
    val sc = new SparkContext(conf) 
    val txtFile = "path/README_mid.md" 
    val txtData = sc.textFile(txtFile) 
    txtData.cache() 

    val tmp = txtData.map(l => l.reverse).zipWithIndex().map{ case(x,y) => (y,x)}.sortByKey(ascending = false).map{ case(u,v) => v} 

    tmp.coalesce(1,true).saveAsTextFile("path/out.md") 

    val txtOut = "path/out.md" 
    val txtOutData = sc.textFile(txtOut) 
    txtOutData.cache() 

    val wcData = txtOutData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(ascending = false) 
    wcData.collect().take(5).foreach(println) 


    } 
} 

的问题是,我是新来的火花和Scala,并且你可以在代码中看到我第一次读取文件扭转它保存然后读取它扭转和输出的五个最频繁出现的词汇。

  • 有没有办法告诉火花保存TMP和工艺wcData(无需保存,打开文件)在同一时间,否则它像读取文件的两倍。
  • 从现在开始,我要解决很多火花,所以如果有代码的任何部分(不像绝对路径名......特定的火花),你可能认为可以写得更好i'de欣赏它。

回答

2
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object ReverseFile { 
    def main(args: Array[String]) { 

    val conf = new SparkConf().setAppName("Reverse File") 
    conf.set("spark.hadoop.validateOutputSpecs", "false") 
    val sc = new SparkContext(conf) 
    val txtFile = "path/README_mid.md" 
    val txtData = sc.textFile(txtFile) 
    txtData.cache() 

    val reversed = txtData 
     .zipWithIndex() 
     .map(_.swap) 
     .sortByKey(ascending = false) 
     .map(_._2) // No need to deconstruct the tuple. 

    // No need for the coalesce, spark should do that by itself. 
    reversed.saveAsTextFile("path/reversed.md") 

    // Reuse txtData here. 
    val wcData = txtData 
     .flatMap(_.split(" ")) 
     .map(word => (word, 1)) 
     .reduceByKey(_ + _) 
     .map(_.swap) 
     .sortByKey(ascending = false) 

    wcData 
     .take(5) // Take already collects. 
     .foreach(println) 
    } 
} 

始终执行collect()最后,使星火可以评估集群上的东西。

+0

拿(5)收集,为什么? – eliasah

+0

谢谢你的答案,但几个笔记:1 - 没有合并火花将文件保存为分区我想保存为一个,2-采取(5)之前collect()给编译错误(没有足够的参数收集),3 - 你能否给我提供一个关于所有这个地图的教程的链接(_ + _/_._ 2)不需要的东西。非常感谢 – Epsilon

+1

1 - 好的。根据需要更改。 2 - 修正了收集。 3 - https://stackoverflow.com/questions/8000903/what-are-all-the-uses-of-an-underscore-in-scala#8000934 – Reactormonk

2

代码中最昂贵的部分是排序,因此明显的改进就是将其删除。在完全过时的第二种情况下,这是相对简单的:

val wcData = txtData 
    .flatMap(_.split(" ")) 
    .map(word => (word, 1)) 
    .reduceByKey(_ + _) // No need to swap or sort 

// Use top method and explicit ordering in place of swap/sortByKey 
val wcData = top(5)(scala.math.Ordering.by[(String, Int), Int](_._2)) 

行的换向顺序有点棘手。首先让每个分区重新排序元素:

val reversedPartitions = txtData.mapPartitions(_.toList.reverse.toIterator) 

现在你有两个选择

  • 使用自定义分区

    class ReversePartitioner(n: Int) extends Partitioner { 
        def numPartitions: Int = n 
        def getPartition(key: Any): Int = { 
        val k = key.asInstanceOf[Int] 
        return numPartitions - 1 - k 
        } 
    } 
    
    val partitioner = new ReversePartitioner(reversedPartitions.partitions.size) 
    
    val reversed = reversedPartitions 
        // Add current partition number 
        .mapPartitionsWithIndex((i, iter) => Iterator((i, iter.toList))) 
        // Repartition to get reversed order 
        .partitionBy(partitioner) 
        // Drop partition numbers 
        .values 
        // Reshape 
        .flatMap(identity) 
    

    它仍然需要洗牌,但它是相对便携,数据仍然是可以访问在记忆中。

  • 如果您只想保存反转的数据,您可以拨打saveAsTextFile并在reversedPartitions上进行逻辑重新排序。由于part-n名称格式标识源分区,所以您只需将part-n重命名为part-(number-of-partitions - 1 -n)即可。它需要保存数据,因此它不是最优的,但如果你使用内存中的文件系统可以是一个很好的解决方案。

+0

整洁。我正在考虑Partitioner的魔法,但不知道如何实现它。 – Reactormonk