前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >disruptor框架原理_disruptor使用

disruptor框架原理_disruptor使用

作者头像
全栈程序员站长
发布2022-09-30 19:43:23
4070
发布2022-09-30 19:43:23
举报
文章被收录于专栏:全栈程序员必看

大家好,又见面了,我是你们的朋友全栈君。

Disruptor 源码

https://github.com/LMAX-Exchange/disruptor/blob/master/README.md https://github.com/LMAX-Exchange/disruptor/wiki/Introduction https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

Disruptor 与 RingBuffer 的关系

  • Disruptor 的存储部分实现了 RingBuffer。
  • Disruptor 提供了方法供 Producer 和 Consumer 线程来通过 ringbuffer 传输数据。

RingBuffer 的本质

  • 固定大小的
  • 先入先出的 (FIFO)
  • Producer-Consumer 模型的
  • 循环使用的一段内存
  • 由于进程周期内,可不用重新释放和分配空间

本质就是一个可重用的 FIFO 队列

Disruptor 适用场景

  • Producer-Consumer 场景,一生产者多消费者,多生产者多消费者(线程安全)
  • 线程之间交换数据
  • 轻量化的消息队列
  • 对队列性能要求高:Disruptor 的速度比 LinkedBlockingQueue 提高了七倍(无锁设计)
  • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图)
  • 典型场景:Canal,从一个 mysql 实例读取 binlog,放到 Disruptor,下游可有多个并发消费者

核心概念

(图自官网:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction

Ring Buffer

  • com/lmax/disruptor/RingBuffer.java
  • Disruptor 的核心存储,环形缓冲区。

Sequence

  • com/lmax/disruptor/Sequence.java
  • 每个Consumer (EventProcessor) 和 Disruptor 本身各保有一个 Sequence。
  • 用来追踪 ringbuffer 和每个 Consumer 的进度。
代码语言:javascript
复制
tracking the progress of the ring buffer and event processors
  • 并发相关代码主要依赖 Sequence 值的改变。
  • Sequence 的核心是一个 protected volatile long value;
  • 可理解 Sequence 为一个加强版的 AtomicLong。在后者基础上增加了防止伪共享的代码。 (关于伪共享:https://www.cnblogs.com/blastbao/p/8290332.html
  • Sequence 如何避免伪共享 简单地说:就是通过 Padding 的方式,将一个 Sequence 在内存中的大小和一个 cache line 对齐,避免伪共享,提高性能。
代码语言:javascript
复制
Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field. 

Sequencer

  • com/lmax/disruptor/MultiProducerSequencer.java
  • com/lmax/disruptor/SingleProducerSequencer.java
  • Disruptor 的核心组件
  • 协调 Producer 和 Consumer 对同一段 ringBuffer 的使用
  • 在生产者和消费者之间快速、正确地传递数据的并发算法

Sequence Barrier

  • 由 Sequencer 产生
  • Sequence Barrier 包含 “决定 Consumer 是否有数据可供消费” 的逻辑
代码语言:javascript
复制
// com/lmax/disruptor/BatchEventProcessor.java
    private void processEvents()
    {
        T event = null;
        long nextSequence = sequence.get() + 1L;

        while (true)
        {
            try
            {
                // 调用 sequenceBarrier.waitFor(nextSequence) 来确定当前可消费的数据位点
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (batchStartAware != null && availableSequence >= nextSequence)
                {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }

                while (nextSequence <= availableSequence)
                {
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }

                sequence.set(availableSequence);
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (running.get() != RUNNING)
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }

// com/lmax/disruptor/ProcessingSequenceBarrier.java
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

	// 可见 SequenceBarrier 的核心是用 waitStrategy 去 waitFor 数据
	// 下面的变量,
    // sequence: 记录 consumer 消费位置的
	// cursorSequence: 记录 ringBuffer 生产位置的
	// dependentSequence: 记录当前 consumer 依赖的其他 consumer 的消费位置的(如果当前 consumer 只从 ringBuffer 读取数据,而不依赖于其他 consumer,那么 dependentSequence 就和 cursorSequence 是同一个,参考 ProcessingSequenceBarrier 的构造函数)
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

Wait Strategy

  • com/lmax/disruptor/BlockingWaitStrategy.java
  • com/lmax/disruptor/BusySpinWaitStrategy.java
代码语言:javascript
复制
// 当 Producer 往 ringBuffer 写入了新数据之后,是怎么通知 Consumer 的呢?
// 这个逻辑就在 waitStrategy 里。
// 以 BlockingWaitStrategy 为例
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence)
        {
            synchronized (mutex)
            {
                while (cursorSequence.get() < sequence)
                {
                    barrier.checkAlert();
		// 如果没有数据,调用 mutex (就是一个普通 Object )的wait,这里代码阻塞。直到有 notify/ notifyAll 被调用时,代码继续执行。
		// 通过 wait 方法阻塞一个线程时,这个线程会放弃 CPU 时间片。
		// 那么 notify / notifyAll 被谁调用呢?答案,BlockingWaitStrategy  有 signalAllWhenBlocking 方法调用 notifyAll,这个方法在 Producer 调用 ringBuffer 的 publish 时被调用。
                    mutex.wait();
                }
            }
        }

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

    public void signalAllWhenBlocking()
    {
        synchronized (mutex)
        {
            mutex.notifyAll();
        }
    }

Event

  • Producer 传输数据给 Consumer 的单位

EventProcessor

  • com/lmax/disruptor/BatchEventProcessorTest.java
  • com/lmax/disruptor/BatchEventProcessor.java
  • 处理 Disruptor 产生数据的主要事件循环,持有 Consumer 的 SequenceEventHandler
代码语言:javascript
复制
The main event loop for handling events from the Disruptor and has ownership of consumer's Sequence
  • 实际就是 Consumer 的主循环。Consumer 只需要注入 eventHandler,BatchEventProcessor 就会调用 eventHandler.onEvent() 来处理 Producer 写入到 ringbuffer 的数据。
代码语言:javascript
复制
// com/lmax/disruptor/BatchEventProcessor.java
private void processEvents()
    {
        T event = null;
        long nextSequence = sequence.get() + 1L;

        while (true)
        {
            try
            {
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (batchStartAware != null && availableSequence >= nextSequence)
                {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }
                
                // 获取数据,并处理
                while (nextSequence <= availableSequence)
                {
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }

                sequence.set(availableSequence);
            }
            ......
        }
    }

EventHandler

  • Disruptor 只定义了接口。
  • 由 Consumer 实现,并注入到 EventProcessor。

Producer

  • 生产者

Disruptor 为什么快而且线程安全

官方文档:http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html

http://ifeve.com/dissecting-disruptor-whats-so-special/

http://ifeve.com/locks-are-bad/ 锁为什么慢,以及 Disruptor 如何避免

http://ifeve.com/disruptor-cacheline-padding/ cache-line-padding

http://ifeve.com/disruptor-memory-barrier/ disruptor-memory-barrier

简单说:

  • 它是数组,所以要比链表快(添加删除更简单,耗费内存更小),且可以利用 CPU 缓存来预加载
  • 数组对象本身一直存在,避免了大对象的垃圾回收(当然元素本身还是要回收的)
  • 在需要确保线程安全的地方,用 CAS 取代锁。
  • 没有竞争 = 没有锁 = 非常快。
  • 所有 Consumer 都记录自己的序号(Sequence),允许多个 Producer 与多个 Consumer 共享 ringbuffer。
  • 在每个对象中都能跟踪 Sequence(ring buffer,claim Strategy,生产者和消费者),加上 Sequence 的 cache line padding,就意味着没有为伪共享和非预期的竞争。

个人觉得最重要的设计就是:

  • 每个 Consumer 持有一个 Sequence,各 Consumer 消费独立。
  • Producer 根据所有 Consumer 的 Sequence 位置决定是否能写入到 ringbuffer,以及写入到何位置。
  • 各 Producer 在并发写时,通过 CAS 避免锁。(可参考下面的代码分析)

关键问题

  • 问题1:多个 Producer 如何协调把数据写入到 ringBuffer
  • 问题2:ringbuffer 如何根据各 consumer 消费速度告知各 Producer 现在是否能写入数据
代码语言:javascript
复制
// 我本地起了 2 个 Producer。
LongEventProducer producer = new LongEventProducer(ringBuffer);
LongEventProducer producer2 = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
producer2.onData(bb);
Thread.sleep(1000);
}
// com/lmax/disruptor/shicaiExample/LongEventProducer.java
public void onData(ByteBuffer bb)
{
// 抢占 ringBuffer 的最新空位,以便把自己的数据写入。
long sequence = ringBuffer.next();  // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0));  // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
// 那么 ringBuffer.next() 是如何在多个 producer 之间协调的呢?
// com/lmax/disruptor/RingBuffer.java
* Increment and return the next sequence for the ring buffer.  Calls of this
* method should ensure that they always publish the sequence afterward.  E.g.
public long next()
{
// 这个 sequencer 是 com/lmax/disruptor/MultiProducerSequencer.java
return sequencer.next();
}
// com/lmax/disruptor/MultiProducerSequencer.java 这个就是 disruptor 的核心,Sequencer
public long next(int n)
{
if (n < 1 || n > bufferSize)
{
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
long current;
long next;
do
{
// cursor 就是一个 sequence,多个 Producer 公用一个 sequence 进行写控制
current = cursor.get();          
next = current + n;
// 判断 ringBuffer 是否满了
long wrapPoint = next - bufferSize;           
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
// gatingSequences 是什么?
// 实际就是所有 consumer 的 seuqence 的集合。创建 consumer 时,通过 updateGatingSequencesForNextInChain 函数把它注册到  MultiProducerSequencer 的。
// 这里做的,是从各个 consumer 的 sequence 中,找到最小的哪个(就是消费最慢的那个)。
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 如果 buffer 已经满了,就一直自旋等待 consumer 消费。当 consumer 消费后,gatingSequence 就更新,从而 gatingSequenceCache 更新,从而从新判断 wrapPoint > cachedGatingSequence, 从而有可能 Producer 获得 buffer 中可写入的位置。
// 为什么有一个 gatingSequenceCache, 这个只是为了减少 getMinimuSequence 的次数,真实逻辑和没有这个 cache 一样。
// 回答了问题 2。
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
// 通过 CAS 来确保多个 Producer 能够正确写入,且不冲突。
// 回答了问题 1。
else if (cursor.compareAndSet(current, next))      
{
break;
}
}
while (true);
return next;
}
// GatingSequence 是由各个 Consumer 在启动 EventProcessor 时添加的
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
// MultiProducerSequencer 的 publish 做了什么?
public void publish(final long sequence)
{
setAvailable(sequence);          // 设置被 publish 的位置为 not available
waitStrategy.signalAllWhenBlocking();          // 通知所有 consumer 
}
  • Consumer 启动 EventProcessor 时 addGatingSequences

  • 问题3:Consumer 是怎么注册的?
代码语言:javascript
复制
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// com/lmax/disruptor/dsl/Disruptor.java
* <p>Set up event handlers to handle events from the ring buffer. These handlers will process events
* as soon as they become available, in parallel.</p>
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
// 把 consumer 加入 consumer 库中。其实就是一个 观察者模式,ringBuffer 有数据后,通知各个 consumer 线程。
consumerRepository.add(batchEventProcessor, eventHandler, barrier);         
processorSequences[i] = batchEventProcessor.getSequence();
}
// 两个 sequence,一个是 barrierSequence, 一个是 processorSequence
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
SequenceBarrier 是用来控制 consumer 读取进度的。
* Wait for the given sequence to be available for consumption.
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
* Get the current cursor value that can be read.
long getCursor();
// 再往里看。sequence 
// com/lmax/disruptor/ProcessingSequenceBarrier.java
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
@Override
public long getCursor()
{
// 返回当前位置,dependentSequence 是一个 Sequence 对象
return dependentSequence.get();
}
// ringBuffer 有了新数据时,disruptor 怎么通知各 consumer 的?
* <p>Starts the event processors and returns the fully configured ring buffer.</p>
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
// 为每个 consumer 启动一个线程,线程逻辑由 BatchEventProcessor 控制
// BatchEventProcessor 有一个 eventHandler 字段,那就是我们自己写处理代码。
consumerInfo.start(executor);        
}
return ringBuffer;
}

Demo

  • Event
代码语言:javascript
复制
public class LongEvent {
private long value;
public void set(long value)
{
this.value = value;
}
public long getValue() {
return value;
}
}
  • Customer
代码语言:javascript
复制
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
try {
Thread.sleep(5000);
} catch (Exception e) {
}
System.out.println("Event: " + event.getValue());
}
}
  • Producer
代码语言:javascript
复制
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next();  // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0));  // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
  • Main
代码语言:javascript
复制
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 2;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
LongEventProducer producer2 = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
producer2.onData(bb);
Thread.sleep(1000);
}
}
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/195345.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年9月10日 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Disruptor 源码
  • Disruptor 与 RingBuffer 的关系
  • RingBuffer 的本质
  • Disruptor 适用场景
  • 核心概念
    • Ring Buffer
      • Sequence
        • Sequencer
          • Sequence Barrier
            • Wait Strategy
              • Event
                • EventProcessor
                  • EventHandler
                    • Producer
                    • Disruptor 为什么快而且线程安全
                    • 关键问题
                    • Demo
                    相关产品与服务
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档