2011-07-02 44 views
195

我想了解disruptor pattern。我观看了InfoQ的视频并试图阅读他们的论文。我知道有一个环形缓冲区,它被初始化为一个非常大的数组,以利用缓存局部性,消除新内存的分配。LMAX的干扰模式如何工作?

这听起来像有其跟踪位置的一个或多个原子整数。每个'事件'似乎都得到一个唯一的ID,它的位置是通过找到其相对于环的大小等的模数而找到的。等等。

不幸的是,我没有一个直观的意义它是如何工作的。我做了很多交易应用程序,并研究了actor model,看了SEDA等。

在他们的介绍中他们提到这种模式基本上是路由器如何工作的;然而我还没有找到路由器如何工作的任何好的描述。

有更好的解释吗?

回答

199

的谷歌代码项目确实reference a technical paper的环形缓冲区的实现,但它是一个有点干,学术和坚韧去为别人想了解它是如何工作的。但是有一些博客文章已经开始以更可读的方式解释内部信息。有一个explanation of ring buffer即破碎图案的芯,description of the consumer barriers(部分相关以从破碎读取)和一些information on handling multiple producers可用。

的干扰器的最简单的描述是:它是尽可能以最有效的方式发送线程之间的消息的方法。它可以用作队列的替代品,但它也与SEDA和Actors共享许多功能。

相比队列:

的干扰物提供到传递消息到另一个线程的能力,如果需要的话(类似于BlockingQueue的)将其唤醒。但是,有三个明显的差异。

  1. Disruptor的用户通过扩展Entry类并提供工厂来执行预分配来定义消息的存储方式。这允许内存重用(复制)或者Entry可以包含对另一个对象的引用。
  2. 把消息插入到分裂器是2相处理中,首先一个时隙被要求中的环形缓冲器,其提供与该可填充有合适的数据条目的用户。然后必须提交条目,这种两阶段方法对于灵活使用上述内存是必要的。这是使消息线程可见消息的提交。
  3. 消费者有责任追踪从环形缓冲区消耗的消息。由于每个线程都维护自己的计数器,因此将此职责从环形缓冲区本身移开,有助于减少写入争用的数量。

相比于演员

角色模型更接近比大多数其他编程模型的干扰器,特别是如果你使用所提供的BatchConsumer/BatchHandler类。这些类隐藏了维护消耗序列号的所有复杂性,并在发生重要事件时提供一组简单的回调。但是,有一些细微的差异。

  1. 的干扰器采用了一枚1线 - 1个消费模型,其中参与者使用N:M模式,即你可以有很多的演员,只要你喜欢,他们会在线程之间的固定电话号码分布(一般为1元核心)。
  2. 的BatchHandler接口提供了额外的(和非常重要的)回调onEndOfBatch()。这允许缓慢的消费者,例如那些正在做I/O的事件一起批量处理以提高吞吐量。可以在其他Actor框架中进行批处理,但是由于几乎所有其他框架在批处理结束时都不提供回调,因此需要使用超时来确定批处理的结束,从而导致延迟较差。

相比SEDA

LMAX建成了干扰器模式,以取代基于SEDA的方法。

  1. 它提供的超过SEDA的主要改进是能够并行工作。为此,Disruptor支持将相同的消息(以相同的顺序)多次转换为多个消费者。这避免了在管道中需要分叉阶段。
  2. 我们也允许消费者等待其他消费者的结果,而不必在他们之间放置另一个排队阶段。消费者可以简单地观察它所依赖的消费者的序列号。这避免了管道中的连接阶段的需要。

相较于记忆障碍

另一种方式来思考它是作为一种结构化的,有序的内存屏障。生产者屏障形成写屏障,消费者屏障成为阅读障碍。

+1

谢谢迈克尔。你的写作和你提供的链接帮助我更好地理解它是如何工作的。其余的,我想我只需要让它沉入。 – Shahbaz

+1

嗨迈克尔,请检查我的答案是否有错误。 – irreputable

+0

我还有疑问:(1)'commit'是如何工作的? (2)当环形缓冲区已满时,生产者如何检测到所有消费者都看过数据以便生产者可以重新使用条目? – Qwertie

130

首先,我们想了解它提供的编程模型。

有一个或一个以上的作家。有一个或多个读者。有一行条目,完全从旧到新排列(从左到右)。作家可以在右端添加新条目。每个读者从左到右依次读取条目。显然,读者不能读过去的作家。

没有条目删除的概念。我使用“读者”而不是“消费者”来避免使用条目的图像。但是我们知道最后一位读者左边的条目变得毫无用处。

通常读者可以同时读取和独立读取。但是我们可以声明读者之间的依赖关系。读取器依赖关系可以是任意的非循环图。如果读者B依赖于读者,读者B不能读取过去的读者A.

读者依赖性的产生是因为读者可以注释的条目,并且阅读器B取决于注释。例如,A对条目进行一些计算,并将结果存储在条目中的字段a中。然后再继续前进,现在B可以读取该条目,并且存储的值为a。如果读者C不依赖于A,则C不应该尝试阅读a

这确实是一个有趣的编程模型。无论性能如何,仅凭借这一模式就可以获得大量应用。

当然,LMAX的主要目标是性能。它使用预先分配的条目环。这个环足够大,但是它是有限的,这样系统就不会超出设计容量。如果戒指已满,作者将等到最慢的读者前进并腾出空间。

为了减少垃圾收集成本,预先分配入口对象并永久生存。我们不插入新的条目对象或删除旧的条目对象,相反,作家要求预先存在的条目,填充其字段并通知读者。这种明显的两相作用确实是简单的原子动作

setNewEntry(EntryPopulator); 

interface EntryPopulator{ void populate(Entry existingEntry); } 

预分配条目也意味着在相邻的存储单元相邻条目(非常可能)定位,并且因为读者读取条目顺序,以利用CPU,这是重要缓存。

和很多的努力,避免锁,CAS,甚至内存屏障(如使用非挥发性序列变量,如果只有一个作家)

对于读者的开发商:不同的标注读者写信给不同的领域,避免写争用。 (实际上它们应该写入不同的缓存行。)注释读者不应该触及其他非依赖读者可能读取的内容。这就是为什么我说这些读者注释条目,而不是修改条目。

+1

对我来说看起来不错。我喜欢使用术语注释。 –

+21

+1这是试图描述破坏者模式如何实际工作的唯一答案,正如OP所要求的。 –

+0

哇!优秀的描述! –

16

我真的花时间研究实际的来源,出于纯粹的好奇心,其背后的想法很简单。在撰写本文时,最新版本是3.2.1。

有一个缓冲区存储预先分配的事件,这些事件将保存消费者阅读的数据。

缓冲区由其长度的标志(整数数组)组成,它描述了缓冲区插槽的可用性(详见进一步细节)。这个数组像java#AtomicIntegerArray一样被访问,所以为了这个完美的目的,你可能会认为它是一个。

可以有任何数量的生产者。当生产者想写入缓冲区时,会生成一个很长的数字(如调用AtomicLong#getAndIncrement,Disruptor实际上使用它自己的实现,但它以相同的方式工作)。我们称之为生成长的producerCallId。以类似的方式,当消费者ENDS从缓冲区中读取一个插槽时,生成consumerCallId。最近的consumerCallId被访问。

(如果有许多消费者,具有最低ID的呼叫choosen。)

这些ID然后被比较,并且如果两者之间的差异较小,缓冲侧,生产者被允许写。

(如果producerCallId比最近consumerCallId + BUFFERSIZE更大,这意味着缓冲器是满的,并且生产者被迫总线等待,直到一个点变得可用。)

然后生产者被分配基于他的callId(它是prducerCallId模缓冲区大小,但由于bufferSize总是2的幂(在缓冲区创建时强制执行限制),所使用的实际操作是producerCallId &(bufferSize-1))。然后可以自由修改该插槽中的事件。

(实际的算法是一个比较复杂,涉及在一个单独的原子参考缓存最近consumerId,用于优化的目的。)

当事件被修改,变化被“发布”。当发布标志阵列中的相应插槽时填充更新的标志。标志值是循环的编号(producerCallId除以bufferSize(因为bufferSize是2的幂,实际操作是右移)

以类似的方式,可以有任意数量的消费者。消费者想要访问缓冲区,生成consumerCallId(取决于消费者如何添加到破坏者中,id生成中使用的原子可能会被共享或分离),然后将这个consumerCallId与最新的produCallId ,如果两者中较小者,则允许读者进步

(类似地,如果producerCallId对于consumerCallId是偶数,则意味着缓冲器是有效的并且消费者被迫等待。等待由WaitStrate定义)

对于个人消费者(拥有自己的id生成器的人),检查的下一件事是批量消费的能力。缓冲区中的槽按照从各自到consumerCallId的顺序(索引以与生产者相同的方式确定)到相应于最近的producerCallId的顺序来检查。

它们通过比较标志数组中写入的标志值与consumerCallId生成的标志值,在循环中进行检查。如果标志匹配,则意味着填充插槽的制作者已经进行了更改。如果不是,则循环中断,并返回最高提交的changeId。从ConsumerCallId到changeId中接收的插槽可以批量使用。

如果一群消费者一起阅读(具有共享ID生成器的消费者),每个消费者只需要一个callId,并且只检查并返回该单个callId的插槽。

6

this article

的破碎图案是由填充有预先分配的转移 对象的圆形 阵列(即,环缓冲器)备份的配料队列其使用存储器屏障同步生产者和 消费者通过序列。

内存障碍是一种很难解释和特丽莎的博客已经做了最好的尝试在我看来,这条信息:http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast.html

但是,如果你不想深入到底层的细节您可以知道Java中的内存屏障是通过volatile关键字或通过java.util.concurrent.AtomicLong实现的。破坏者模式序列是AtomicLongs,并通过存储器屏障而不是锁来在生产者和消费者之间来回传递。

我觉得更容易理解通过代码的一个概念,所以下面的代码是一个简单的HelloWorld CoralQueue,这是与我关联CoralBlocks做了破坏者模式实现。在下面的代码中,您可以看到干扰程序模式如何实现批处理以及环缓冲区(即,圆形阵列)允许两个线程之间无垃圾通信:

package com.coralblocks.coralqueue.sample.queue; 

import com.coralblocks.coralqueue.AtomicQueue; 
import com.coralblocks.coralqueue.Queue; 
import com.coralblocks.coralqueue.util.MutableLong; 

public class Sample { 

    public static void main(String[] args) throws InterruptedException { 

     final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class); 

     Thread consumer = new Thread() { 

      @Override 
      public void run() { 

       boolean running = true; 

       while(running) { 
        long avail; 
        while((avail = queue.availableToPoll()) == 0); // busy spin 
        for(int i = 0; i < avail; i++) { 
         MutableLong ml = queue.poll(); 
         if (ml.get() == -1) { 
          running = false; 
         } else { 
          System.out.println(ml.get()); 
         } 
        } 
        queue.donePolling(); 
       } 
      } 

     }; 

     consumer.start(); 

     MutableLong ml; 

     for(int i = 0; i < 10; i++) { 
      while((ml = queue.nextToDispatch()) == null); // busy spin 
      ml.set(System.nanoTime()); 
      queue.flush(); 
     } 

     // send a message to stop consumer... 
     while((ml = queue.nextToDispatch()) == null); // busy spin 
     ml.set(-1); 
     queue.flush(); 

     consumer.join(); // wait for the consumer thread to die... 
    } 
}