前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >并发编程之Disruptor

并发编程之Disruptor

作者头像
lyb-geek
发布2018-03-27 15:20:02
2.2K1
发布2018-03-27 15:20:02
举报
文章被收录于专栏:Linyb极客之路

一、Disruptor是什么

Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式实现,或者事件-监听模式的实现,直接称disruptor模式。

Disruptor最大特点是高性能,它被设计用于在生产者—消费者问题(producer-consumer problem,简称PCP)上获得尽量高的吞吐量(TPS,Transaction Per Second))和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升。

其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路为存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序提供了一种大幅提升性能(TPS)的方案。

从功能上来看,Disruptor实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。可以拿JDK的BlockingQueue做一个简单对比,以便更好地认识Disruptor是什么。

我们知道BlockingQueue是一个FIFO队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者(Consumer)能获得通知;如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。这些都是Disruptor能做到的,与之不同的是,Disruptor能做更多:

同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图)

预分配用于存储事件内容的内存空间

针对极高的性能目标而实现的极度优化和无锁的设计

以上的描述虽然简单地指出了Disruptor是什么,但对于它“能做什么”还不是那么直截了当。一般性地来说,当你需要在两个独立的处理过程(两个线程)之间交换数据时,就可以使用Disruptor。当然使用队列(如上面提到的BlockingQueue)也可以,只不过Disruptor做得更好。

二、一些重要的概念

在对Disruptor的特性进行说明之前,有必要对几个概念做一些说明。

1、CAS

CAS是Compare and Swap的简写,顾名思义,这个方法的功能就是比较和替换。简单来说,比较和替换是使用一个期望值和一个变量的当前值进行比较,如果当前变量的值与我们期望的值相等,就使用一个新值替换当前变量的值。java.util.concurrent包完全建立在CAS之上,可见其在并发上的重要程度。

CAS在Java中是由Native方法实现的,具体细节在这就不再深究了,大致使用了机器指令级别的原子性优化,所以CAS提供了高效无锁的原子操作。

public final native boolean compareAndSwapInt(Object o, long offset,

int expected,

int x);

2、伪共享

CUP的伪共享问题的本质是:几个在逻辑上独立的变量,由于被cpu加载在同一个缓存行当中,当在多线程环境下,被不同的cpu执行,导致缓存行失效从而引起Cache命中率大幅降低。例如:当两个线程分别对一个数组中的两份数据进行写操作,每个线程操作不同index上的数据,看上去,两份数据之间是不存在同步问题的,但是,由于他们可能在同一个cpu缓存行当中,这就会使这一份缓存行出现大量的缓存失效。如前所述,当一份线程更新时要给另一份线程发送RFO(请求所有权/Request For Ownership,RFO)消息并使其缓存失效。解决这个问题的一个办法是让这个数组中不同index的数据在不同的缓存行:因为缓存行的大小是64个字节,那么只要让数组中每份数据的大小大于64个字节,就可以保证他们在不同的缓存行当中,就能避免这样的伪共享问题。不过这样做缺点也很明显,过多没意义的数据占用了宝贵的cache空间,故这种优化不到迫不得已还是慎用。

3、锁

区分竞争锁和非竞争锁对性能的影响非常重要。如果一个锁自始至终只被一个线程使用,那么JVM有能力优化它带来的绝大部分损耗。如果一个锁被多个线程使用过,但是在任意时刻,都只有一个线程尝试获取锁,那么它的开销要大一些。我们将以上两种锁称为非竞争锁。而对性能影响最严重的情况出现在多个线程同时尝试获取锁时。这种情况是JVM无法优化的,而且通常会发生从用户态到内核态的切换。现代JVM已对非竞争锁做了很多优化,使它几乎不会对性能造成影响。常见的优化有以下几种。

如果一个锁对象只能由当前线程访问,那么其他线程无法获得该锁并发生同步,因此JVM可以去除对这个锁的请求。

逃逸分析(escape analysis)可以识别本地对象的引用是否在堆中被暴露。如果没有,就可以将本地对象的引用变为线程本地的(thread local)。

编译器还可以进行锁的粗化(lock coarsening)。把邻近的synchronized块用相同的锁合并起来,以减少不必要的锁的获取和释放。因此,不要过分担心非竞争锁带来的开销,要关注那些真正发生了锁竞争的临界区中性能的优化。

三、Disruptor为什么这么快

1、利用CAS

使用CAS来保证多线程安全,与大部分并发队列使用的锁相比,CAS显然要快很多。CAS是CPU级别的指令,更加轻量,不必像锁一样需要操作系统提供支持,所以每次调用不需要在用户态与内核态之间切换,也不需要上下文切换。

2、缓存行填充

CPU缓存常以64 Bytes作为一个缓存行大小,缓存由若干个缓存行组成,缓存写回主存或主存写入缓存均是以行为单位,此外每个CPU核心都有自己的缓存(但是若某个核心对某缓存行做出修改,其他拥有同样缓存的核心需要进行同步),生产者和消费者的指针用long型表示,假设现在只有一个生产者和一个消费者,那么双方的指针间没有什么直接联系,只要不“挨着”,应该可以各改各的指针。OK前面说有点乱,但都是前提,下面问题来了:如果生产者和消费者的指针(加起来共16bytes)出现在同一个缓存行中会怎么样?例如CPU核心A运行的消费者修改了一下自己的指针值(P1),那么其他核心中所有缓存了P1的缓存行都将失效,并从主存重新调配。这样做的缺点显而易见,但是CPU和编译器并未聪明到避免这个问题,所以需要缓存行填充。虽然问题产生的原因很绕,但是解决方案却非常简单:对于一个long型的缓冲区指针,用一个长度为8的long型数组代替。如此一来,一个缓存行被这个数组填充满,线程对各自指针的修改不会干扰到他人。

3、避免垃圾回收

系统在高压力情况下频繁新建对象必定导致更频繁的GC,Disruptor避免这个问题的策略是:提前分配。在创建RingBuffer实例时,参数中要求给出缓冲区元素类型的Factory,创建实例时,Ring Buffer会首先用由Factory产生的实例将整个缓冲区填满,后面生产者生产时,不再直接新建对象,而是获得之前已经新建好的实例,然后设置其中的值。

4、批量操作

Disruptor默认的BatchEventProcessor会尽量把能处理的事件一次性处理完,而不是处理完一个事件就立即让出CPU资源。这种机制有效地减少了线程间竞争的概率。

四、Disruptor的组件模型

此小节将讲解一下Disruptor的一些主要组件的功能,进一步了解Disruptor的整体架构。

1、Ring Buffer

RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现事件的并行或者顺序处理甚至两种方式组合处理。其缺点是在生产者端判断RingBuffer是否已满是需要跟踪更多的信息,为此,在RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

2、Sequence

Sequence是Disruptor最核心的组件。其通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个Sequence用于跟踪标识某个特定的事件处理者(RingBuffer/Consumer)的处理进度。生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享。

Sequencer

Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

3、Sequence Barrier

SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系。在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。 SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。

4、Wait Strategy

Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。当消费者等待在SequenceBarrier上时,Disruptor 提供了多个WaitStrategy的实现,每种策略都具有不同性能和优缺点。在延迟和CPU资源的占用上有所不同,根据实际运行环境的CPU的硬件特点选择恰当的策略,并配合特定的JVM的配置参数,能够实现不同的性能提升。

BusySpinWaitStrategy:自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。

BlockingWaitStrategy :使用锁和条件变量。CPU资源的占用少,延迟大。

SleepingWaitStrategy :在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。

YieldingWaitStrategy :在多次循环尝试不成功后,选择让出CPU,等待下次调度。平衡了延迟和CPU资源占用,但延迟比较均匀。

PhasedBackoffWaitStrategy :上面多种策略的综合,CPU资源的占用少,延迟大。

5、EventProcessor

在Disruptor中,消费者是以EventProcessor的形式存在的。EventProcessor持有特定消费者(Consumer)的Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

6、EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是Consumer的真正实现。

7、Producer

即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

整体结构图

四、如何使用Disruptor

Disruptor的jar包可以在Maven库中找到,也可以到Github中Disruptor项目中下载。接下来,我们以生产者与消费者之间传long型数据为例来演示如何使用Disruptor,生产者产生long型数据,消费者将其打印出来。

1、定义事件

事件(Event)就是通过Disruptor进行数据交换的载体,long型数据通过LongEvent在Disruptor中传递。

public class LongEvent {

private long value;

public void set(long value) {

this.value = value;

}

}

2、定义事件工厂

事件工厂(EventFactory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口EventFactory<T>。Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。一个Event实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个Event的实例,然后往Event实例中填充数据,之后再发布到RingBuffer中,之后由Consumer获得该Event实例并从中读取数据。

public class LongEventFactory implements EventFactory<LongEvent> {

public LongEvent newInstance() {

return new LongEvent();

}

}

3、定义事件处理的具体实现

事件定义好后,消费者需要处理具体事件,具体的处理逻辑可以在EventHandler<T>中实现。

public class LongEventHandler implements EventHandler<LongEvent> {

public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {

System.out.println("Event: " + event);

}

}

4、启动Disruptor

Disruptor 通过ExecutorService提供的线程来触发Consumer的事件处理。

Disruptor 定义了WaitStrategy接口用于抽象Consumer如何等待新事件,这是策略模式的应用。Disruptor提供了多个WaitStrategy的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的CPU的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。具体策略前面已经进行过介绍,在此不再赘述。

public class DisruptorStart {

public static void main(String[] args) {

LongEventFactory factory = new LongEventFactory();

int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是2的幂

//单生产者模式

Disruptor<LongEvent> disruptor = new Disruptor<>(factory,

ringBufferSize, Executors.defaultThreadFactory(),

ProducerType.SINGLE,

new YieldingWaitStrategy());

EventHandler<LongEvent> eventHandler = new LongEventHandler();

disruptor.handleEventsWith(eventHandler);

disruptor.start();

}

}

此时,Disruptor就已经可以工作了。

5、发布事件

我们假定一个生产者,先不管数据从何而来,假定数据从网络IO或者磁盘IO中获取的,拿到数据后系统自动回调onData方法。

public class LongEventProducer {

private final RingBuffer<LongEvent> ringBuffer;

public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {

this.ringBuffer = ringBuffer;

}

public void onData(ByteBuffer bb) {

long sequence = ringBuffer.next(); // 获取下一个序列号

try {

LongEvent event = ringBuffer.get(sequence); // 根据序列号获取预分配的数据槽

event.set(bb.getLong(0)); // 向数据槽中填充数据

} finally {

ringBuffer.publish(sequence);

}

}

}

注意,最后的ringBuffer.publish方法必须包含在finally中以确保必须得到调用;如果某个请求的sequence未被提交,将会堵塞后续的发布操作或者其它的producer。Disruptor还提供另外一种形式的调用来简化以上操作,并确保publish 总是得到调用。

public class LongEventProducerWithTranslator {

private final RingBuffer<LongEvent> ringBuffer;

public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {

this.ringBuffer = ringBuffer;

}

private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =

new EventTranslatorOneArg<LongEvent, ByteBuffer>() {

public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {

event.set(bb.getLong(0));

}

};

public void onData(ByteBuffer bb) {

ringBuffer.publishEvent(TRANSLATOR, bb);

}

}

Disruptor根据传入的参数提供了几种Translator接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg),使用种Translator有几点好处:

封装了发布的复杂性,防止遗漏ringBuffer.publish方法

可以将逻辑分散到单独的类中

可以使用Java8的lambda表达式表示Translator,使语法更简洁

6、关闭Disruptor

注意,如果Disruptor使用结束后,记得释放资源

disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;

整体调用流程

整体的调用过程

public class LongEventMain {

public static void main(String[] args) throws Exception {

// 事件工厂

LongEventFactory factory = new LongEventFactory();

// 指明RingBuffer的大小,必须为2的幂

int bufferSize = 1024;

Disruptor<LongEvent> disruptor = new Disruptor<>(factory,

bufferSize, Executors.defaultThreadFactory(),

ProducerType.SINGLE,

new YieldingWaitStrategy());

// 置入处理逻辑

disruptor.handleEventsWith(new LongEventHandler());

disruptor.start();

// 获取ringBuffer,用于发布事件

RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

LongEventProducer producer = new LongEventProducer(ringBuffer);

ByteBuffer bb = ByteBuffer.allocate(8);

for (long l = 0; true; l++) {

bb.putLong(0, l);

producer.onData(bb);

Thread.sleep(1000);

}

}

五、性能对比

为了直观地感受Disruptor有多快,作者设计了一个性能对比测试:Producer发布100万次事件,从发布第一个事件开始计时,捕捉Consumer处理完所有事件的耗时。

在此就不详细阐述对比过程了,从测试结果看, Disruptor的性能比ArrayBlockingQueue高出了几乎一个数量级

六、Disruptor的一些高级特性

1、并行的事件处理

Disruptor提供多消费者并行处理事件的功能,使用如下配置即可:

... other code

//handler1~4会并行处理事件

disruptor.handleEventsWith(handler1, handler2, handler3, handler4);

RingBuffer<ValueEvent> ringBuffer = disruptor.start();

... other code

2、有顺序依赖的事件处理

Disruptor也提供多有先后顺序的消费者处理流程:

... other code

//handler1先于handler2、3、4处理事件,后三者并行处理

disruptor.handleEventsWith(handler1).then(handler2, handler3, handler4);

//handler5、6、7、8依次处理事件

disruptor.handleEventsWith(handler5).then(handler6).then(handler7).then(handler8);

RingBuffer<ValueEvent> ringBuffer = disruptor.start();

... other code

3、多链事件处理

Disruptor允许创建多个处理流程,链与链之间为并行处理关系,链中的handler为顺序关系:

... other code

//A链 handler1与2有先后顺序

disruptor.handleEventsWith(handler1).then(handler2);

//B链 handler3与4有先后顺序

disruptor.handleEventsWith(handler3).then(handler4);

//A链与B链为并行关系,没有顺序依赖

... other code

4、使用自定义的事件处理器

一般而言,提供了EventHandler 后,Disruptor会默认实例化一个BatchEventProcessor,由于Disruptor的EventProcessor不一定满足用户需求,用户可以自定义EventProcessor:

RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();

SequenceBarrier barrier = ringBuffer.newBarrier();

final MyEventProcessor customProcessor = new MyEventProcessor(ringBuffer, barrier);

disruptor.handleEventsWith(processor);

disruptor.start();

Disruptor将在start()方法调用后执行自定义的processor。如果需要自定义的processor按指定顺序处理事件,例如指定customProcessor在handler1、2之后处理event可以这样写:

SequenceBarrier barrier = disruptor.handleEventsWith(handler1, handler2).asBarrier();

final MyEventProcessor customProcessor = new MyEventProcessor(ringBuffer, barrier);

disruptor.handleEventsWith(customProcessor);

disruptor.start();

七、应用场景

个人思考下来,它适合一切异步环境,但是对于并发量小的场景不一定需要。在log4j2中,已经使用了disruptor进行日志记录。同样是用异步,选择disruptor会更快。

1、在一些获取验证码,发短信的场景下,对实时性要求不够,如果收不到,用户可以再次要求重发

2、对于一些奖品,卡券的发放,在高峰期,可以只入队,在之后用异步的方式慢慢发放。

3、对于比较复杂的逻辑可以进行并发操作

八、小结

disruptor作为一个高并发框架,从CPU层面对整个代码进行优化。具有如下特点

1、队列使用数组结构,而不是使用传统的链表结构,寻址更快

2、新生产的对象采用覆盖的方式(不是传统阻塞队列,删除->添加的逻辑),减少GC回收的负担

3、从CPU层面优化,对Sequencer进行内存分配补齐,消除Java伪共享(cpu缓存行)

4、多个线程同时访问,由于他们都通过序号器Sequencer访问ringBuffer,通过CAS取代了加锁和同步块,这也是并发编程的一个指导性原则:把同步块最小化到一个变量上。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-02-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Linyb极客之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
短信
腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档