0

是我缓存的理解错了吗?在我所有的转换之后,得到的RDD非常小,比如1GB。它计算的数据非常大,大小约700 GB。星火电子病历“超出内存限制”可用于检查点/缓存工作

我要运行的逻辑阅读成千上万的相当大的文件,所有计算小得多导致RDD。每次迭代都会处理下一批400个文件,这些文件在读入时可能会炸毁大约700 GB的大小。传入的RDD以相同的方式进行处理(读取和转换),然后与积累的RDD合并。 I 缓存和检查点每次迭代后(也是非运行(阻塞= true)旧版本的结果rdd),以便我可以削减RDD谱系,这样我就不必重新计算结果出错,并节省执行人员的空间。 所以,我想,在任何时候我真的只需要1 GB *迭代+〜750GB的内存总容量为我的工作,而1.6 TB应该是绰绰有余的数量。但显然我误解了一些东西。

在每次迭代中,GC的时间越来越长。 Spark UI显示执行者位于红色区域(在GC上花费时间超过10%)。然后,整个工作或许未能在第三或第四迭代与像内存限制超过消息,失落执行人/无路径执行人,和纱线杀死了我的执行人。我认为通过缓存和检查点,我为执行者节省了大量空间。我只是不明白是否有某种内存泄漏? 为什么内存继续填满?

我在EMR运行星火2.1.1 m3.large实例。我的群集大小限制在〜1.6TB。我用下面的配置中运行:

driver-memory 8g 
deploy-mode cluster 
spark.dynamicAllocation.enabled=true 
spark.dynamicAllocation.minExecutors=100 
spark.dynamicAllocation.maxExecutors=200 
spark.shuffle.service.enabled=true 
executor-cores 4 
executor-memory 8g 

什么我的代码看起来有点像:

var accRdd = <empty> 
val batchSize = 400 
var iteration = 1 
filesToIngest.grouped(batchSize).foreach { 
    val transformedRdd = transform(accRdd).reduceByKey((row1, row2) => 
     combine(row1, row2) 
    ) 
    val oldAccRdd = accRdd 
    accRdd = accRdd.union(transformedRdd).reduceByKey((row1, row2) => 
     combine(row1, row2) 
    ).coalesce(5 + i) 
    accRdd.persist(MEMORY_AND_DISK_SER) 
    accRdd.checkpoint() 
    oldAccRdd.unpersist(blocking = true) // I assume this will ensure all references to this cleared from memory 
    log_info(s"Total row count on iteration: ${accRdd.count()}") 
    iteration += 1 
} 

我已经按照此建议:https://github.com/deeplearning4j/nd4j/issues/1251,并正尝试以避免调整其它配置变量相关以gc,记忆分数和jvm。再次,我正在寻找对可能发生的事情的解释,以及我对缓存/检查点的假设可能是错误的。 谢谢!

回答

0

你可能想看看一些从我们的记忆页的建议: https://deeplearnin4j.org/memory

https://deeplearning4j.org/spark

一般来说,deeplearning4j都有自己的堆外的内存。你应该使用更多的执行器来设置较小的批处理大小,注意javacpp关闭堆配置,并将火花内存设置为允许的范围。