1

Disruptor Getting Started Guide之后,我已经与一个生产者和一个消费者建立了一个最小的破坏者。为什么使用较小的环形缓冲区会干扰较慢?

生产者

import com.lmax.disruptor.RingBuffer; 

public class LongEventProducer 
{ 
    private final RingBuffer<LongEvent> ringBuffer; 

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) 
    { 
     this.ringBuffer = ringBuffer; 
    } 

    public void onData() 
    { 
     long sequence = ringBuffer.next(); 
     try 
     { 
      LongEvent event = ringBuffer.get(sequence); 
     } 
     finally 
     { 
      ringBuffer.publish(sequence); 
     } 
    } 
} 

消费者(请注意消费者什么都不做onEvent

import com.lmax.disruptor.EventHandler; 

public class LongEventHandler implements EventHandler<LongEvent> 
{ 
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) 
    {} 
} 

我的目标是性能测试绕了一大圈缓冲去一次与穿越小环多次。在每种情况下,总操作数(bufferSize X rotations)是相同的。我发现,随着环形缓冲区变小,操作/秒速率急剧下降。

RingBuffer Size | Revolutions | Total Ops | Mops/sec 

    1048576  |  1  | 1048576 |  50-60 

     1024  |  1024  | 1048576 |  8-16 

     64  |  16384 | 1048576 | 0.5-0.7 

     8  |  131072 | 1048576 | 0.12-0.14 

问:什么是业绩的大规模下降的原因时,环缓冲区大小减少,但总的迭代次数是固定的?这种趋势与WaitStrategySingle vs MultiProducer无关 - 吞吐量降低,但趋势相同。

主要(注意SingleProducerBusySpinWaitStrategy

import com.lmax.disruptor.BusySpinWaitStrategy; 
import com.lmax.disruptor.dsl.Disruptor; 
import com.lmax.disruptor.RingBuffer; 
import com.lmax.disruptor.dsl.ProducerType; 

import java.util.concurrent.Executor; 
import java.util.concurrent.Executors; 

public class LongEventMainJava{ 
     static double ONEMILLION = 1000000.0; 
     static double ONEBILLION = 1000000000.0; 

    public static void main(String[] args) throws Exception { 
      // Executor that will be used to construct new threads for consumers 
      Executor executor = Executors.newCachedThreadPool();  

      // TUNABLE PARAMS 
      int ringBufferSize = 1048576; // 1024, 64, 8 
      int rotations = 1; // 1024, 16384, 131702 

      // Construct the Disruptor 
      Disruptor disruptor = new Disruptor<>(new LongEventFactory(), ringBufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); 

      // Connect the handler 
      disruptor.handleEventsWith(new LongEventHandler()); 

      // Start the Disruptor, starts all threads running 
      disruptor.start(); 

      // Get the ring buffer from the Disruptor to be used for publishing. 
      RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
      LongEventProducer producer = new LongEventProducer(ringBuffer); 

      long start = System.nanoTime(); 
      long totalIterations = rotations * ringBufferSize; 
      for (long i = 0; i < totalIterations; i++) { 
       producer.onData(); 
      } 
      double duration = (System.nanoTime()-start)/ONEBILLION; 
      System.out.println(String.format("Buffersize: %s, rotations: %s, total iterations = %s, duration: %.2f seconds, rate: %.2f Mops/s", 
        ringBufferSize, rotations, totalIterations, duration, totalIterations/(ONEMILLION * duration))); 
     } 
} 

并运行,你需要琐碎代码

import com.lmax.disruptor.EventFactory; 

public class LongEventFactory implements EventFactory<LongEvent> 
{ 
    public LongEvent newInstance() 
    { 
     return new LongEvent(); 
    } 
} 

运行在核心i5-2400,12GB RAM ,windows 7

样本输出

Buffersize: 1048576, rotations: 1, total iterations = 1048576, duration: 0.02 seconds, rate: 59.03 Mops/s 

Buffersize: 64, rotations: 16384, total iterations = 1048576, duration: 2.01 seconds, rate: 0.52 Mops/s 

回答

2

当制片人(S)填补了环形缓冲区,它必须等待,直到事件能够继续之前消耗。

当您的缓冲区大小与您要放入的元素数量大小相同时,制作人员无需等待。它永远不会溢出。它所做的只是增加计数,索引,并将数据发布到该索引处的环形缓冲区中。

当你的缓冲区较小时,它仍然只是递增计数和发布,但它比消费者能够消耗的速度快。因此生产者必须等到元素被消耗并且环形缓冲区上的空间被释放。

+0

谢谢。那么为什么在这个例子中是我的消费者,它实际上什么都不做,只能访问与生产者相比比较慢的基础'LongEvent'?我曾假设制片人会是限制因素。 –

+0

@AdamHughes你的部分不做任何事情,但'Disrupto'r基础设施在调用'onEvent'方法之前做了一些工作。恰恰相反,这比你的制作人更多的工作。 –

0

好像问题就在于这个代码块中lmax\disruptor\SingleProducerSequencer

if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) 
     { 
      cursor.setVolatile(nextValue); // StoreLoad fence 

      long minSequence; 
      while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) 
      { 
       waitStrategy.signalAllWhenBlocking(); 
       LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? 
      } 

      this.cachedValue = minSequence; 
     } 

特别调用LockSupport.parkNanos(1L)。这可能需要最多15ms on Windows。当生产者到达缓冲区的末尾并等待消费者时,这会被调用。其次,当缓冲区很小时,RingBuffer的错误共享可能会发生。我猜测这两种效应都在起作用。

最后,在基准测试之前,我可以使用JIT加速代码,其中有一百万次调用onData()。这得到了最好的情况下,> 80Mops/sec,但没有消除缓冲收缩的退化。