2016-09-15 48 views
0

所以我认为我有这个天才的想法来解决一个非常具体的问题,但我不能摆脱最后一个潜在的线程安全问题。我想知道你们是否有想法解决这个问题。实现一个双缓冲的java不同步的HashMap读取

问题:

线程数量庞大的需要从一个HashMap中,只有很少的更新阅读。问题在于,在ConcurrentHashMap中,即线程安全版本,读取方法仍有可能碰到互斥体,因为写入方法仍然锁定分箱(即地图部分)。

的想法:

有2个隐藏包含HashMap作为一个...一个线程,不进行同步阅读,其他的线程来写,当然是有同步,每过一段时间,翻转它们。

显而易见的警告是地图只是最终一致,但让我们假设这足以达到预期的目的。

但是出现的问题是,它仍然留下一个竞争条件,即使使用AtomicInteger等,因为只是当翻转发生时,我不能确定读者没有滑入......问题出在startRead()方法的第262-272行和flip()方法的第241-242行之间。


显然ConcurrentHashMap的是利用这个问题的非常非常好类,我只是想看看我能不能有点进一步推的想法。

任何人有任何想法?


下面是该类的完整代码。 (不完全调试/测试,但你的想法...)

package org.nectarframework.base.tools; 

    import java.util.Collection; 

    import java.util.HashMap; 
    import java.util.LinkedList; 
    import java.util.Map; 
    import java.util.Set; 
    import java.util.concurrent.atomic.AtomicBoolean; 
    import java.util.concurrent.atomic.AtomicInteger; 

    /** 
    * 
    * This map is intended to be both thread safe, and have (mostly) non mutex'd 
    * reads. 
    * 
    * HOWEVER, if you insert something into this map, and immediately try to read 
    * the same key from the map, it probably won't give you the result you expect. 
    * 
    * The idea is that this map is in fact 2 maps, one that handles writes, the 
    * other reads, and every so often the two maps switch places. 
    * 
    * As a result, this map will be eventually consistent, and while writes are 
    * still synchronized, reads are not. 
    * 
    * This map can be very effective if handling a massive number of reads per unit 
    * time vs a small number of writes per unit time, especially in a massively 
    * multithreaded use case. 
    * 
    * This class isn't such a good idea because it's possible that between 
    * readAllowed.get() and readCounter.increment(), the flip() happens, 
    * potentially sending one or more threads on the Map that flip() is about to 
    * update. The solution would be an 
    * AtomicInteger.compareGreaterThanAndIncrement(), but that doesn't exist. 
    * 
    * 
    * @author schuttek 
    * 
    */ 

    public class DoubleBufferHashMap<K, V> implements Map<K, V> { 

     private Map<K, V> readMap = new HashMap<>(); 
     private Map<K, V> writeMap = new HashMap<>(); 
     private LinkedList<Triple<Operation, Object, V>> operationList = new LinkedList<>(); 

     private AtomicBoolean readAllowed = new AtomicBoolean(true); 
     private AtomicInteger readCounter = new AtomicInteger(0); 

     private long lastFlipTime = System.currentTimeMillis(); 
     private long flipTimer = 3000; // 3 seconds 

     private enum Operation { 
      Put, Delete; 
     } 

     @Override 
     public int size() { 
      startRead(); 
      RuntimeException rethrow = null; 
      int n = 0; 
      try { 
       n = readMap.size(); 
      } catch (RuntimeException t) { 
       rethrow = t; 
      } 
      endRead(); 
      if (rethrow != null) { 
       throw rethrow; 
      } 
      return n; 
     } 

     @Override 
     public boolean isEmpty() { 
      startRead(); 
      RuntimeException rethrow = null; 
      boolean b = false; 
      try { 
       b = readMap.isEmpty(); 
      } catch (RuntimeException t) { 
       rethrow = t; 
      } 
      endRead(); 
      if (rethrow != null) { 
       throw rethrow; 
      } 
      return b; 
     } 

     @Override 
     public boolean containsKey(Object key) { 
      startRead(); 
      RuntimeException rethrow = null; 
      boolean b = false; 
      try { 
       b = readMap.containsKey(key); 
      } catch (RuntimeException t) { 
       rethrow = t; 
      } 
      endRead(); 
      if (rethrow != null) { 
       throw rethrow; 
      } 
      return b; 
     } 

     @Override 
     public boolean containsValue(Object value) { 
      startRead(); 
      RuntimeException rethrow = null; 
      boolean b = false; 
      try { 
       b = readMap.containsValue(value); 
      } catch (RuntimeException t) { 
       rethrow = t; 
      } 
      endRead(); 
      if (rethrow != null) { 
       throw rethrow; 
      } 
      return b; 
     } 

     @Override 
     public V get(Object key) { 
      startRead(); 
      RuntimeException rethrow = null; 
      V v = null; 
      try { 
       v = readMap.get(key); 
      } catch (RuntimeException t) { 
       rethrow = t; 
      } 
      endRead(); 
      if (rethrow != null) { 
       throw rethrow; 
      } 
      return v; 
     } 

     @Override 
     public synchronized V put(K key, V value) { 
      operationList.add(new Triple<>(Operation.Put, key, value)); 
      writeMap.put(key, value); 
      return value; 
     } 

     @Override 
     public synchronized V remove(Object key) { 
      // Not entirely sure if we should return the value from the read map or 
      // the write map... 
      operationList.add(new Triple<>(Operation.Delete, key, null)); 
      V v = writeMap.remove(key); 
      endRead(); 
      return v; 
     } 

     @Override 
     public synchronized void putAll(Map<? extends K, ? extends V> m) { 
      for (K k : m.keySet()) { 
       V v = m.get(k); 
       operationList.add(new Triple<>(Operation.Put, k, v)); 
       writeMap.put(k, v); 
      } 
      checkFlipTimer(); 
     } 

     @Override 
     public synchronized void clear() { 
      writeMap.clear(); 
      checkFlipTimer(); 
     } 

     @Override 
     public Set<K> keySet() { 
      startRead(); 
      RuntimeException rethrow = null; 
      Set<K> sk = null; 
      try { 
       sk = readMap.keySet(); 
      } catch (RuntimeException t) { 
       rethrow = t; 
      } 
      endRead(); 
      if (rethrow != null) { 
       throw rethrow; 
      } 
      return sk; 
     } 

     @Override 
     public Collection<V> values() { 
      startRead(); 
      RuntimeException rethrow = null; 
      Collection<V> cv = null; 
      try { 
       cv = readMap.values(); 
      } catch (RuntimeException t) { 
       rethrow = t; 
      } 
      endRead(); 
      if (rethrow != null) { 
       throw rethrow; 
      } 
      return cv; 
     } 

     @Override 
     public Set<java.util.Map.Entry<K, V>> entrySet() { 
      startRead(); 
      RuntimeException rethrow = null; 
      Set<java.util.Map.Entry<K, V>> se = null; 
      try { 
       se = readMap.entrySet(); 
      } catch (RuntimeException t) { 
       rethrow = t; 
      } 
      endRead(); 
      if (rethrow != null) { 
       throw rethrow; 
      } 
      endRead(); 
      return se; 
     } 

     private void checkFlipTimer() { 
      long now = System.currentTimeMillis(); 
      if (this.flipTimer > 0 && now > this.lastFlipTime + this.flipTimer) { 
       flip(); 
       this.lastFlipTime = now; 
      } 
     } 

     /** 
     * Flips the two maps, and updates the map that was being read from to the 
     * latest state. 
     */ 
     @SuppressWarnings("unchecked") 
     private synchronized void flip() { 
      readAllowed.set(false); 
      while (readCounter.get() != 0) { 
       Thread.yield(); 
      } 

      Map<K, V> temp = readMap; 
      readMap = writeMap; 
      writeMap = temp; 

      readAllowed.set(true); 
      this.notifyAll(); 

      for (Triple<Operation, Object, V> t : operationList) { 
       switch (t.getLeft()) { 
       case Delete: 
        writeMap.remove(t.getMiddle()); 
        break; 
       case Put: 
        writeMap.put((K) t.getMiddle(), t.getRight()); 
        break; 
       } 
      } 
     } 

     private void startRead() { 
      if (!readAllowed.get()) { 
       synchronized (this) { 
        try { 
         wait(); 
        } catch (InterruptedException e) { 
        } 
       } 
      } 
      readCounter.incrementAndGet(); 
     } 

     private void endRead() { 
      readCounter.decrementAndGet(); 
     } 

    } 
+3

如果写入很少,为什么要关注'ConcurrentHashMap'偶尔会被锁定? – shmosel

+0

代码中没有行号。 – shmosel

+2

您是否考虑过使用Copy-on-write模式,如CopyOnWriteArraySet或CopyOnWriteArrayList?这导致读取不同步,代价是写入的代价更高。 –

回答

1

我强烈建议你去学习如何使用JMH,这是你应该学习的优化算法和数据路径上的第一件事-structures。

例如,如果您知道如何使用它,您可以快速发现只有10%的写入时ConcurrentHashMap执行的非常接近非同步的HashMap

4个线程(10%写入):

Benchmark      Mode Cnt Score Error Units 
SO_Benchmark.concurrentMap thrpt 2 69,275   ops/s 
SO_Benchmark.usualMap   thrpt 2 78,490   ops/s 

8个线程(10%写入):

Benchmark      Mode Cnt Score Error Units 
SO_Benchmark.concurrentMap thrpt 2 93,721   ops/s 
SO_Benchmark.usualMap   thrpt 2 100,725   ops/s 

随着写入ConcurrentHashMap的表现往往更小的百分比去更加接近HashMap的一个。

现在我修改了startReadendRead,使他们无功能,但很简单:

private void startRead() { 
    readCounter.incrementAndGet(); 
    readAllowed.compareAndSet(false, true); 
} 

private void endRead() { 
    readCounter.decrementAndGet(); 
    readAllowed.compareAndSet(true, false); 
} 

而且让我们看一下性能:

Benchmark      Mode Cnt Score Error Units 
SO_Benchmark.concurrentMap thrpt 10 98,275 ? 2,018 ops/s 
SO_Benchmark.doubleBufferMap thrpt 10 80,224 ? 8,993 ops/s 
SO_Benchmark.usualMap   thrpt 10 106,224 ? 4,205 ops/s 

这些结果告诉我们,与一个原子计数器和一个原子布尔修改每个操作我们不能得到比ConcurrentHashMap更好的性能。 (我已经尝试过30,10和5的写入比例,但它从来没有带来更好的性能,因为它们的性能与DoubleBufferHashMap相同)

Pastebin如果您有兴趣,

+0

谢谢,是的,这让我的问题变得毫无意义。我会考虑JMH! – schuttek