2016-01-22 116 views
16

当我与1点GB的数据集它完成没有任何错误运行分析代码。但是,当我试图数据的25 GB的时候,我得到下面的错误。我想了解如何避免以下失败。很高兴听到任何建议或想法。FetchFailedException或MetadataFetchFailedException当处理大数据集

不同的充错误,

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 

org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-xxxxxxxx 

org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/mnt/yarn/nm/usercache/xxxx/appcache/application_1450751731124_8446/blockmgr-8a7b17b8-f4c3-45e7-aea8-8b0a7481be55/08/shuffle_0_224_0.data, offset=12329181, length=2104094} 

群集详细说明:

纱线:8个节点
核心总数:64
内存:500 GB
火花版本:1.5

星火提交声明:

spark-submit --master yarn-cluster \ 
         --conf spark.dynamicAllocation.enabled=true \ 
         --conf spark.shuffle.service.enabled=true \ 
         --executor-memory 4g \ 
         --driver-memory 16g \ 
         --num-executors 50 \ 
         --deploy-mode cluster \ 
         --executor-cores 1 \ 
         --class my.parser \ 
         myparser.jar \ 
         -input xxx \ 
         -output xxxx \ 

一个堆栈跟踪:

at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460) 
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456) 
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456) 
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183) 
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47) 
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:88) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

回答

48

这个错误几乎肯定会被记忆的问题在你的遗嘱执行人造成的。我能想到几种方法来解决这些类型的问题。

1)你可以尝试运行更多的分区(在你的dataframe上做一个repartition)。存储器问题通常出现在一个或多个分区包含比将适合存储器更多的数据。

2)我注意到你没有明确设置spark.yarn.executor.memoryOverhead,所以它会默认为max(386, 0.10* executorMemory),在你的情况下它将是400MB。这对我来说听起来很低。我会尝试将其增加到1GB(注意,如果将memoryOverhead增加到1GB,则需要将--executor-memory降至3GB)

3)查找发生故障的节点上的日志文件。你想寻找文字“杀死容器”。如果你看到文本“运行超出物理内存限制”,增加memoryOverhead会 - 在我的经验 - 解决问题。

+0

是否是数字2)也以独立模式计数。如果是的话,我们如何设置它。我无法在独立模式下找到类似的变量。 – Laeeq

3

我也有一些好的结果,将Spark timeout spark.network.timeout增加到800这样的较大值。默认的120秒会导致很多执行程序在重负载时超时。