2015-10-20 16 views
3

我有一个Metrics类,它应该跟踪我们每秒处理多少个事务以及它们需要多长时间。其结构的相关部分看起来是这样的:多个变量之间的java线程安全

public class Metrics { 
    AtomicLong sent = new AtomicLong(); 
    AtomicLong totalElapsedMsgTime = new AtomicLong(); 

    AtomicLong sentLastSecond = new AtomicLong(); 
    AtomicLong avgTimeLastSecond = new AtomicLong(); 

    public void outTick(long elapsedMsgTime){ 
     sent.getAndIncrement(); 
     totalElapsedMsgTime.getAndAdd(elapsedMsgTime); 
    } 

    class CalcMetrics extends TimerTask { 
     @Override 
     public void run() { 
      sentLastSecond.set(sent.getAndSet(0)); 
      long tmpElapsed = totalElapsedMsgTime.getAndSet(0); 
      long tmpSent = sentLastSecond.longValue(); 

      if(tmpSent != 0) { 
       avgTimeLastSecond.set(tmpElapsed/tmpSent); 
      } else { 
       avgTimeLastSecond.set(0); 
      } 
     } 
    } 
} 

我的问题是,outTick函数将被调用上百次从许多不同的线程中的第二。 AtomicLong已经确保每个变量都是单独的线程安全,并且它们不会在该函数中彼此交互,所以我不希望一个锁会让一个调用outTick阻塞另一个线程调用outTick。如果有几个不同的线程递增发送的变量,然后它们都添加到totalElapsedMsgTime变量中,那就很好了。

但是,一旦进入CalcMetrics运行方法(它只发生一次每秒一次),它们就会交互。我希望确保能够在不进行outTick调用的情况下重置这两个变量,或者在接收一个变量和下一个变量之间发生另一个outTick调用。

有没有办法做到这一点? (我的解释是否有意义?)有没有办法说A不能与B交错,但是多个B可以相互交错?


编辑:

我去,詹姆斯建议ReadWriteLock中。以下是我对任何感兴趣的人的结果:

public class Metrics { 
    AtomicLong numSent = new AtomicLong(); 
    AtomicLong totalElapsedMsgTime = new AtomicLong(); 

    long sentLastSecond = 0; 
    long avgTimeLastSecond = 0; 

    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); 
    private final Lock readLock = readWriteLock.readLock(); 
    private final Lock writeLock = readWriteLock.writeLock(); 

    public void outTick(long elapsedMsgTime) { 
     readLock.lock(); 
     try { 
      numSent.getAndIncrement(); 
      totalElapsedMsgTime.getAndAdd(elapsedMsgTime); 
     } 
     finally 
     { 
      readLock.unlock(); 
     } 
    } 

    class CalcMetrics extends TimerTask { 

     @Override 
     public void run() { 
      long elapsed; 

      writeLock.lock(); 
      try { 
       sentLastSecond = numSent.getAndSet(0); 
       elapsed = totalElapsedMsgTime.getAndSet(0); 
      } 
      finally { 
       writeLock.unlock(); 
      } 

      if(sentLastSecond != 0) { 
       avgTimeLastSecond = (elapsed/sentLastSecond); 
      } else { 
       avgTimeLastSecond = 0; 
      } 
     } 
    } 
} 

回答

0

听起来像你需要一个读写器锁。 (java.util.concurrent.locks.ReentrantReadWriteLock)。

您的outTick()函数会锁定ReaderLock。允许任意数量的线程同时锁定ReaderLock。您的calcMetrics()会锁定WriterLock。一旦一个线程等待写作者锁定,就不允许新的读者进入,并且只有当所有的读者都不在时,作者才被允许进入。

您仍然需要原子来保护单个计数器,这些计数器会增加outTick()

+0

谢谢。这看起来像我在找什么。在我的特定用例中,“读取”和“写入”有点用词不当,因为我希望在更新它们时使用并发性(并因此使用ReaderLock),并在收集结果时使用专用WriterLock,但至少ReadWriteLock用相同的变量来解决排他性和并发区域两方面的问题。 (我对这种语言仍然很陌生 - 非常需要学习。) – Lesley

+0

@Lesley,有时它也被称为“共享/排他”锁。 –

0

使用锁(https://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html)。一旦你实现了锁定,你将拥有更好的控制权。另外一个副作用是你将不再需要使用AtomicLong(尽管你仍然可以);您可以使用易变长来代替,这会更有效率。我没有在这个例子中做出改变。

基本上只是创建一个新的对象:

private Object lock = new Object(); 

然后,使用synchronized关键字与周围的一切在同一时间与同一把锁的另一个synchronized块不应该发生的代码的对象。例如:

synchronized(lock) 
{ 
    sent.getAndIncrement(); 
    totalElapsedMsgTime.getAndAdd(elapsedMsgTime); 
} 

所以,你的整个程序看起来就像这样(注:未经测试的代码)

public class Metrics { 
    private Object lock = new Object(); 

    AtomicLong sent = new AtomicLong(); 
    AtomicLong totalElapsedMsgTime = new AtomicLong(); 

    AtomicLong sentLastSecond = new AtomicLong(); 
    AtomicLong avgTimeLastSecond = new AtomicLong(); 

    public void outTick(long elapsedMsgTime){ 
     synchronized (lock) 
     { 
      sent.getAndIncrement(); 
      totalElapsedMsgTime.getAndAdd(elapsedMsgTime); 
     } 
    } 

    class CalcMetrics extends TimerTask { 
     @Override 
     public void run() { 
      synchronized (lock) 
      { 
       sentLastSecond.set(sent.getAndSet(0)); 
       long tmpElapsed = totalElapsedMsgTime.getAndSet(0); 
       long tmpSent = sentLastSecond.longValue(); 

       if(tmpSent != 0) { 
        avgTimeLastSecond.set(tmpElapsed/tmpSent); 
       } else { 
        avgTimeLastSecond.set(0); 
       } 
      } 
     } 
    } 
} 

编辑:我扔一起快速(丑)效率测试程序,并发现,当我与锁同步,我得到整体更好的性能。请注意,由于Java JIT尚未将所有代码路径编译为机器码时的计时结果不代表长期运行时间,所以前2次运行的结果将被丢弃。

结果:

  • 随着锁:8365ms
  • 的AtomicLong:21254ms

代码:

import java.util.concurrent.atomic.AtomicLong; 

public class Main 
{ 
    private AtomicLong testA_1 = new AtomicLong(); 
    private AtomicLong testB_1 = new AtomicLong(); 

    private volatile long testA_2 = 0; 
    private volatile long testB_2 = 0; 

    private Object lock = new Object(); 

    private volatile boolean a = false; 
    private volatile boolean b = false; 
    private volatile boolean c = false; 

    private static boolean useLocks = false; 

    public static void main(String args[]) 
    { 
     System.out.println("Locks:"); 
     useLocks = true; 
     test(); 

     System.out.println("No Locks:"); 
     useLocks = false; 
     test(); 

     System.out.println("Locks:"); 
     useLocks = true; 
     test(); 

     System.out.println("No Locks:"); 
     useLocks = false; 
     test(); 
    } 

    private static void test() 
    { 
     final Main main = new Main(); 

     new Thread() 
     { 
      public void run() 
      { 
       for (int i = 0; i < 80000000; ++i) 
        main.outTick(10); 

       main.a = true; 
      } 
     }.start(); 

     new Thread() 
     { 
      public void run() 
      { 
       for (int i = 0; i < 80000000; ++i) 
        main.outTick(10); 

       main.b = true; 
      } 
     }.start(); 

     new Thread() 
     { 
      public void run() 
      { 
       for (int i = 0; i < 80000000; ++i) 
        main.outTick(10); 

       main.c = true; 
      } 
     }.start(); 

     long startTime = System.currentTimeMillis(); 

     // Okay this isn't the best way to do this, but it's good enough 
     while (!main.a || !main.b || !main.c) 
     { 
      try 
      { 
       Thread.sleep(1); 
      } catch (InterruptedException e) 
      { 
      } 
     } 

     System.out.println("Elapsed time: " + (System.currentTimeMillis() - startTime) + "ms"); 
     System.out.println("Test A: " + main.testA_1 + " " + main.testA_2); 
     System.out.println("Test B: " + main.testB_1 + " " + main.testB_2); 
     System.out.println(); 
    } 

    public void outTick(long elapsedMsgTime) 
    { 
     if (!useLocks) 
     { 
      testA_1.getAndIncrement(); 
      testB_1.getAndAdd(elapsedMsgTime); 
     } 
     else 
     { 
      synchronized (lock) 
      { 
       ++testA_2; 
       testB_2 += elapsedMsgTime; 
      } 
     } 
    } 
} 
+0

如果我不能得到我想要的工作,类似这样的东西将是我的备用解决方案。我的问题是,它阻止超过我想阻止。 (多次调用outTick会阻塞_each other_out,这将比每秒一次的定时器更频繁地发生) 然后,也许能够将AtomicLongs更改为volatile变量将获得与额外的阻塞一样多的效率成本。我不确定那一部分。 – Lesley

+0

他们已经互相阻拦了。 AtomicLong只是为你阻塞,所以你没有看到它,但效率将是相同的。更不用说对原始类型进行操作比调用方法效率更高。只要*确保他们是易变的*。 –

+0

@Lesley看到我的代码展示了更好的锁定效率。实际上,使用AtomicLong的锁(而不是volatile,long,作为示例代码)也证明比没有锁的AtomicLong更有效,因为AtomicLong的暴力冲突解决方法比Java锁更加低效。 –

1

通常的解决方案是包装的所有变量如一个原子数据类型。

class Data 
{ 
    long v1, v2; 

    Data add(Data another){ ... } 
} 

AtomicReference<Data> aData = ...; 

public void outTick(long elapsedMsgTime) 
{ 
    Data delta = new Data(1, elapsedMsgTime); 

    aData.accumulateAndGet(delta, Data:add); 
}  

就你而言,它可能不会比只是锁定更快。

java8还有另一个有趣的锁 - StampedLock。 javadoc示例与您的用例非常相似。基本上,你可以对多个变量进行乐观的读取;之后,检查以确保在读取期间没有写入。就你的情况而言,每秒写入“数百”,乐观的读取大多会成功。