我真的花时间研究实际的来源,出于纯粹的好奇心,其背后的想法很简单。在撰写本文时,最新版本是3.2.1。
有一个缓冲区存储预先分配的事件,这些事件将保存消费者阅读的数据。
缓冲区由其长度的标志(整数数组)组成,它描述了缓冲区插槽的可用性(详见进一步细节)。这个数组像java#AtomicIntegerArray一样被访问,所以为了这个完美的目的,你可能会认为它是一个。
可以有任何数量的生产者。当生产者想写入缓冲区时,会生成一个很长的数字(如调用AtomicLong#getAndIncrement,Disruptor实际上使用它自己的实现,但它以相同的方式工作)。我们称之为生成长的producerCallId。以类似的方式,当消费者ENDS从缓冲区中读取一个插槽时,生成consumerCallId。最近的consumerCallId被访问。
(如果有许多消费者,具有最低ID的呼叫choosen。)
这些ID然后被比较,并且如果两者之间的差异较小,缓冲侧,生产者被允许写。
(如果producerCallId比最近consumerCallId + BUFFERSIZE更大,这意味着缓冲器是满的,并且生产者被迫总线等待,直到一个点变得可用。)
然后生产者被分配基于他的callId(它是prducerCallId模缓冲区大小,但由于bufferSize总是2的幂(在缓冲区创建时强制执行限制),所使用的实际操作是producerCallId &(bufferSize-1))。然后可以自由修改该插槽中的事件。
(实际的算法是一个比较复杂,涉及在一个单独的原子参考缓存最近consumerId,用于优化的目的。)
当事件被修改,变化被“发布”。当发布标志阵列中的相应插槽时填充更新的标志。标志值是循环的编号(producerCallId除以bufferSize(因为bufferSize是2的幂,实际操作是右移)
以类似的方式,可以有任意数量的消费者。消费者想要访问缓冲区,生成consumerCallId(取决于消费者如何添加到破坏者中,id生成中使用的原子可能会被共享或分离),然后将这个consumerCallId与最新的produCallId ,如果两者中较小者,则允许读者进步
(类似地,如果producerCallId对于consumerCallId是偶数,则意味着缓冲器是有效的并且消费者被迫等待。等待由WaitStrate定义)
对于个人消费者(拥有自己的id生成器的人),检查的下一件事是批量消费的能力。缓冲区中的槽按照从各自到consumerCallId的顺序(索引以与生产者相同的方式确定)到相应于最近的producerCallId的顺序来检查。
它们通过比较标志数组中写入的标志值与consumerCallId生成的标志值,在循环中进行检查。如果标志匹配,则意味着填充插槽的制作者已经进行了更改。如果不是,则循环中断,并返回最高提交的changeId。从ConsumerCallId到changeId中接收的插槽可以批量使用。
如果一群消费者一起阅读(具有共享ID生成器的消费者),每个消费者只需要一个callId,并且只检查并返回该单个callId的插槽。
谢谢迈克尔。你的写作和你提供的链接帮助我更好地理解它是如何工作的。其余的,我想我只需要让它沉入。 – Shahbaz
嗨迈克尔,请检查我的答案是否有错误。 – irreputable
我还有疑问:(1)'commit'是如何工作的? (2)当环形缓冲区已满时,生产者如何检测到所有消费者都看过数据以便生产者可以重新使用条目? – Qwertie