2017-01-22 53 views
3

我正在YARN客户端模式下运行Spark应用程序,其中包含六个执行程序(每个四个内核和执行程序内存= 6 GB,开销= 4 GB,Spark版本为1.6.3/2.1.0)。对YARN上的Spark应用程序,物理内存使用量不断增加

我发现我的执行程序内存不断增加,直到被节点管理器杀死;并且它提供了告诉我提高spark.yarn.excutor.memoryOverhead的信息。

我知道这个参数主要控制堆内分配的内存大小。但是我不知道Spark引擎何时以及如何使用这部分内存。增加这部分内存并不总能解决我的问题。有时它有效,有时不起作用。当输入数据很大时,它趋于无用。我的应用程序的逻辑非常简单。它意味着将一天内生成的小文件(一天一个目录)合并为一个文件并写回HDFS。这里是核心代码:

val df = spark.read.parquet(originpath) 
       .filter(s"m = ${ts.month} AND d = ${ts.day}") 
       .coalesce(400) 
val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d") 

dropDF.repartition(1).write 
     .mode(SaveMode.ErrorIfExists) 
     .parquet(targetpath) 

源文件可能有几百到几千层的分区。总拼花地板文件大约是1到5   GB。

另外我发现,在洗牌读取不同机器数据的步骤中,洗牌读取的大小比输入大小大四倍,这是有线或我不知道的一些原理。

无论如何,我已经做了一些自己的搜索这个问题。有些文章说它在直接缓冲存储器上(我没有设置自己)。

有文章说人们用更频繁的全GC解决它。

而且,我觉得一个人对堆栈溢出 一个非常类似的情况:Ever increasing physical memory for a Spark application in YARN

这家伙声称,它与木一个错误,但评论质疑他。在此邮件列表的人也可能会收到一封电子邮件小时前从一边写JSON谁形容这个问题blondowski:Executors - running out of memory

所以看起来是不同的输出格式,常见的问题。

我希望有经验的人可以对这个问题作出解释。为什么会发生这种情况,解决这个问题的可靠方法是什么?

+0

我会首先说''repartition(1)'和/或'coalesce(1)'主要是Spark中的反模式,除非您的数据非常小,那么您可以收集结果并在平常中写入方式。 – eliasah

+0

@eliasah有没有另一种有效的方式来完成我的组合工作? –

+0

为什么你需要把一切都放在一个实木复合地板文件中? – eliasah

回答

1

我只是跟我的同事做了一些调查。下面是我的想法:从Spark 1.2开始,我们使用Netty和堆外存储器来在洗牌和缓存块传输期间减少GC。在我的情况下,如果我尝试增加足够大的内存开销。我会得到最大直接缓冲区异常。当Netty阻止传输时,默认情况下会有五个线程将数据块抓取到目标执行程序。在我的情况下,单个块太大而无法放入缓冲区。所以gc在这里没有帮助。我的最终解决方案是在重新分区之前做另一次重新分区(1)。只是制作比原来的分区多10倍的分区。这样,我可以减少每个块的Netty传输的大小。这样我终于可以做到了。

另外我想说的是,将大数据集重新分区为单个文件并不是一个好的选择。这种极不平衡的情况有点浪费您的计算资源。

欢迎来到任何评论,我还是不太了解这部分。

相关问题