2015-01-21 22 views
2

我遇到一个问题,即Spark上的Accumulator不能是GC。如何确保垃圾收集未使用的蓄电池?

def newIteration (lastParams: Accumulable[Params, (Int, Int, Int)], lastChosens: RDD[Document], i: Int): Params = { 
    if (i == maxIteration) 
     return lastParams.value 

    val size1: Int = 100 
    val size2: Int = 1000 

    // each iteration generates a new accumulator 
    val params = sc.accumulable(Params(size1, size2)) 

    // there is map operation here 
    // if i only use lastParams, the result in not updated 
    // but params can solve this problem 
    val chosen = data.map { 
     case(Document(docID, content)) => { 
      lastParams += (docID, content, -1) 
      val newContent = lastParams.localValue.update(docID, content) 
      lastParams += (docID, newContent, 1) 
      params += (docID, newContent, 1) 
      Document(docID, newContent) 
     } 
    }.cache() 
    chosen.count() 
    lastChosens.unpersist() 
    return newIteration(params, chosen, i + 1) 
} 

问题是,它分配的内存一直在增长,直到内存限制。看来lastParms不是GC。类RDDBroadcast有一个方法unpersist(),但我在文档中找不到像这样的任何方法。

为什么Accumulable不能自动GC,还是有更好的解决方案?

回答