2016-01-25 51 views
12

使用scala运行spark工作,正如所料,所有工作都按时完成,但某些INFO日志在工作停止前打印20-25分钟。Spark工作完成,但应用程序需要时间关闭

发布几个UI截图,可以帮助解决问题。

  1. 以下是采取4个阶段时间:

Time taken by 4 stages

  • 以下是连续的作业ID time between consecutive job ids
  • 我之间的时间不明白为什么在这两个工作ID之间花费了很多时间。

    以下是我的代码片段:

    val sc = new SparkContext(conf) 
    for (x <- 0 to 10) { 
        val zz = getFilesList(lin); 
        val links = zz._1 
        val path = zz._2 
        lin = zz._3 
        val z = sc.textFile(links.mkString(",")).map(t => t.split('\t')).filter(t => t(4) == "xx" && t(6) == "x").map(t => titan2(t)).filter(t => t.length > 35).map(t => ((t(34)), (t(35), t(5), t(32), t(33)))) 
        val way_nodes = sc.textFile(way_source).map(t => t.split(";")).map(t => (t(0), t(1))); 
        val t = z.join(way_nodes).map(t => (t._2._1._2, Array(Array(t._2._1._2, t._2._1._3, t._2._1._4, t._2._1._1, t._2._2)))).reduceByKey((t, y) => t ++ y).map(t => process(t)).flatMap(t => t).combineByKey(createTimeCombiner, timeCombiner, timeMerger).map(averagingFunction).map(t => t._1 + "," + t._2) 
        t.saveAsTextFile(path) 
    } 
    sc.stop() 
    

    一些更随访:spark-1.4.1 saveAsTextFile to S3 is very slow on emr-4.0.0

    +0

    我想一般一个更新的代码建议使用Databricks中的spark-csv包而不是saveAsTextFile,但除此之外,您运行的是哪个Spark版本? –

    +0

    saveAsTextFile的优点是我可以直接在s3上保存所有内容,不知道spark-csv包数据框如何工作。感谢一些方向,无论如何将调查它。 spark - 1.4.1 scala - 2.10.6 – Harshit

    回答

    2

    我最终升级了我的spark版本,问题已解决。

    17

    正如我加入了注释,我建议使用火花CSV包,而不是sc.saveAsTextFile并且没有问题使用该包直接写入s3 :)

    我不知道您是否使用s3或s3n,但也许尝试切换。我在使用Spark 1.5.2(EMR-4.2)上的s3a时遇到了问题,其中写入超时并切换回s3解决了问题,所以值得一试。

    一对夫妇的应该加快其他的东西写入S3 IS使用DirectOutputCommiter

    conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter") 
    

    和禁用代_SUCCESS文件:

    sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") 
    

    请注意,禁用_SUCCESS文件必须是设置为SparkContext的hadoop配置,而不是SparkConf

    我希望这会有所帮助。

    1

    将文件写入S3时遇到同样的问题。我用的是星火2.0版本,只是给你的验证答案

    在星火2.0可以使用,

    val spark = SparkSession.builder().master("local[*]").appName("App_name").getOrCreate() 
    
    spark.conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter") 
    spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") 
    

    这解决了我的工作的问题得到击中

    相关问题