2016-11-16 89 views
0

我是Spark新手。我试图找出Spark的驱逐策略,有些人说它是LRU,例如,this articlethis oneSpark的当前驱逐策略是什么? FIFO还是LRU?

然而,当我看着的MemoryStoreBlockManager的源代码,我找不到LRU的逻辑:

  1. 有LinkedHashMap中记录的所有块在MemoryStore的

    // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and 
    // acquiring or releasing unroll memory, must be synchronized on `memoryManager`! 
    private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true) 
    
  2. 被访问块时,它不会被移动到的LinkedHashMap的头

    def getValues(blockId: BlockId): Option[Iterator[_]] = { 
        val entry = entries.synchronized { entries.get(blockId) } 
        entry match { 
         case null => None 
         case e: SerializedMemoryEntry[_] => 
         throw new IllegalArgumentException("should only call getValues on deserialized blocks") 
         case DeserializedMemoryEntry(values, _, _) => 
         val x = Some(values) 
         x.map(_.iterator) 
        } 
    } 
    
  3. 在驱逐块的逻辑
  4. ,所选块在LinkedHashMap中的的entrySet, 我认为的顺序是先入和后入先出

    private[spark] def evictBlocksToFreeSpace(
        blockId: Option[BlockId], 
        space: Long, 
        memoryMode: MemoryMode): Long = { 
        assert(space > 0) 
        memoryManager.synchronized { 
        var freedMemory = 0L 
        val rddToAdd = blockId.flatMap(getRddId) 
        val selectedBlocks = new ArrayBuffer[BlockId] 
        def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { 
         entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) 
        } 
        // This is synchronized to ensure that the set of entries is not changed 
        // (because of getValue or getBytes) while traversing the iterator, as that 
        // can lead to exceptions. 
        entries.synchronized { 
         val iterator = entries.entrySet().iterator() 
         while (freedMemory < space && iterator.hasNext) { 
         val pair = iterator.next() 
         val blockId = pair.getKey 
         val entry = pair.getValue 
         if (blockIsEvictable(blockId, entry)) { 
          // We don't want to evict blocks which are currently being read, so we need to obtain 
          // an exclusive write lock on blocks which are candidates for eviction. We perform a 
          // non-blocking "tryLock" here in order to ignore blocks which are locked for reading: 
          if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) { 
          selectedBlocks += blockId 
          freedMemory += pair.getValue.size 
          } 
         } 
         } 
        } 
        ... 
        if (freedMemory >= space) { 
         logInfo(s"${selectedBlocks.size} blocks selected for dropping " + 
         s"(${Utils.bytesToString(freedMemory)} bytes)") 
         for (blockId <- selectedBlocks) { 
         val entry = entries.synchronized { entries.get(blockId) } 
         // This should never be null as only one task should be dropping 
         // blocks and removing entries. However the check is still here for 
         // future safety. 
         if (entry != null) { 
          dropBlock(blockId, entry) 
         } 
         } 
         ... 
        } 
        } 
    } 
    

因此,驱逐Spark的策略是FIFO还是LRU?

回答

-1

我以前有同样的问题,但答案相当棘手: 从您粘贴在这里的代码,没有明确的“促进”操作。 但实际上,“LinkedHashMap”是一种确保LRU顺序的特殊数据结构。

0

你在这行有LinkedHashMap中的构造器: 私人VAL项=新的LinkedHashMap [块标识,MemoryEntry [_](32,0.75f,真) 是一个构造在LinkedHashMap的创建访问顺序: LinkedHashMap(int initialCapacity,float loadFactor,boolean accessOrder) 布尔值设置为true,这表示按照最近访问次数最少的访问顺序对键进行排序。

+0

这样回答问题的方式是什么? – luk2302

+0

再次阅读我的答案!我说:“布尔值被设置为true,这意味着键是根据最近访问过的最近访问的访问顺序排序的。”所以驱逐策略是LRU。这些块根据其在条目linkedHashMap中的访问顺序进行排序。所选择的驱逐块按照LinkedHashMap的entrySet的顺序,这意味着要被驱逐的第一个块是最近最少使用的块 –

+0

这个答案是有帮助和正确的,谢谢niko – leon