以前一直听说有Disruptor这个东西,都是性能很强大,所以这几天自己也看了一下。 下面是自己的学习笔记,另外推荐几篇自己看到写的比较好的博客: Disruptor——一种可替代有界队列完成并发线程间数据交换的高性能解决方案 Disruptor3.0的实现细节
SequenceBarrier
)配合使用来消除锁和CASSequence
:Sequence
:SingleProducerSequencer
a. 消费者的序号数值必须小于生产者序号数值;b. 消费者序号数值必须小于其前置(依赖关系)消费者的序号数值; c. 生产者序号数值不能大于消费者最小的序号数值以避免生产者速度过快,将还未来得及消费的消息覆盖BlockingWaitStrategy
-- 查看源码YieldingWaitStrategy
BatchEventProcessor
Disruptor:Disruptor的入口,主要封装了环形队列RingBuffer、消费者集合ConsumerRepository的引用;主要提供了获取环形队列、添加消费者、生产者向RingBuffer中添加事件(可以理解为生产者生产数据)的操作; RingBuffer:Disruptor中队列具体的实现,底层封装了Object[]数组;在初始化时,会使用Event事件对数组进行填充,填充的大小就是bufferSize设置的值;此外,该对象内部还维护了Sequencer(序列生产器)具体的实现; Sequencer:序列生产器,分别有MultiProducerSequencer(多生产者序列生产器) 和 SingleProducerSequencer(单生产者序列生产器)两个实现类。上面的例子中,使用的是SingleProducerSequencer;在Sequencer中,维护了消费者的Sequence(序列对象)和生产者自己的Sequence(序列对象);以及维护了生产者与消费者序列冲突时候的等待策略WaitStrategy; Sequence:序列对象,内部维护了一个long型的value,这个序列指向了RingBuffer中Object[]数组具体的角标。生产者和消费者各自维护自己的Sequence;但都是指向RingBuffer的Object[]数组; Wait Strategy:等待策略。当没有可消费的事件时,消费者根据特定的策略进行等待;当没有可生产的地方时,生产者根据特定的策略进行等待; Event:事件对象,就是我们Ringbuffer中存在的数据,在Disruptor中用Event来定义数据,并不存在Event类,它只是一个定义; EventProcessor:事件处理器,单独在一个线程内执行,判断消费者的序列和生产者序列关系,决定是否调用我们自定义的事件处理器,也就是是否可以进行消费; EventHandler:事件处理器,由用户自定义实现,也就是最终的事件消费者,需要实现EventHandler接口;
RingBuffer
:
Sequence
:
这个里面缓存行的填充很经典,设计成前7后7 Long类型来填充,保证消除伪共享。 使用空间换时间,避免伪共享。Java8中使用@sun.misc.Contended 来消除伪共享,在运行时需要设置JVM启动参数:-XX:-RestrictContended
这里前7后7加上本身的Value值,总共是有15个Long元素,无论如何拆分,Value和预填充的Long型数据一定会处于单独的一个缓存行。
SingleProducerSequencer
这里就是用简单的if else判断,就避免了加锁,CAS的消耗,这里是使用序号栅栏,通过巧妙的算法+自旋操作来实现等待的操作。 解析如下图:
其中可以自己写代码去debug,创建ringBuffer长度为4,消费者阻塞在第0个元素的消费中。然后生产者再生产第5个元素的时候就会进行自旋等待。
BlockingWaitStrategy
BatchEventProcessor
waitFor 可以参考上面的BlockingWaitStrategy
的waitFor() 方法