前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解Disruptor

深入理解Disruptor

作者头像
JavaEdge
发布2023-01-14 09:55:31
4900
发布2023-01-14 09:55:31
举报
文章被收录于专栏:JavaEdgeJavaEdge

Disruptor通过缓存行填充,利用CPU高速缓存,只是Disruptor“快”的一个因素,快的另一因素是“无锁”,尽可能发挥CPU本身的高速处理性能。

1 缓慢的锁

Disruptor作为一个高性能的生产者-消费者队列系统,核心就是通过RingBuffer实现一个无锁队列。

Jdk像LinkedBlockingQueue队列库,比Disruptor RingBuffer慢很多。

1.1 链表数据在内存布局对高速缓存不友好

RingBuffer使用数组:

1.2 锁依赖

生产者-消费者模式里,可能有多个消费者,也可能多个生产者。 多个生产者都要往队尾指针添加新任务,产生多线程竞争。于是,做这事时,生产者就要拿到对队尾的锁。同样多个消费者去消费队头时,也就产生竞争。同样消费者也要拿到锁。

那只有一个生产者或一个消费者,是不是就没锁的竞争问题?No!生产者-消费者模式下,消费者比生产者快。不然,队列会积压,任务越堆越多:

  • 越来越多的任务没能及时完成
  • 内存也放不下

虽然生产者-消费者模型下,都有队列作为缓冲区,但大部分情况下,这缓冲区空。即使只有一个生产者和一个消费者,这生产者指向的队尾和消费者指向的队头是同一节点。于是,这两个生产者和消费者之间一样产生锁竞争。

在LinkedBlocking Queue锁机制通过ReentrantLock实现。用Java在JVM上直接实现的加锁机制,由JVM进行裁决。这锁的争夺,会把没有拿到锁的线程挂起等待,也需经过一次上下文切换(Context Switch)。

这上下文切换要做的和异常和中断里的一样。上下文切换过程,要把当前执行线程的寄存器等信息,保存到线程栈。即已加载到高速缓存的指令或数据,又回到主内存,会进步拖慢性能。

Disruptor的Benchmark测试:把一个long类型counter,从0自增到5亿

  • 一种方式没任何锁
  • 另外一个方式每次自增时都取一个锁

分别207毫秒和9603毫秒,性能差近50倍。

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class LockBenchmark {


    public static void runIncrement() {
        long counter = 0;
        long max = 500000000L;
        long start = System.currentTimeMillis();
        while (counter < max) {
            counter++;
        }
        long end = System.currentTimeMillis();
        System.out.println("Time spent is " + (end-start) + "ms without lock");
    }


    public static void runIncrementWithLock() {
        Lock lock = new ReentrantLock();
        long counter = 0;
        long max = 500000000L;
        long start = System.currentTimeMillis();
        while (counter < max) {
            if (lock.tryLock()){
                counter++;
                lock.unlock();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("Time spent is " + (end-start) + "ms with lock");
    }


    public static void main(String[] args) {
        runIncrement();
        runIncrementWithLock();
    }
}

加锁和不加锁自增counter Time spent is 207ms without lock Time spent is 9603ms with lock 性能差出将近10倍

2 无锁的RingBuffer

加锁很慢,所以Disruptor“无锁”,即没有os层的锁。 Disruptor还利用CPU硬件支持的指令,CAS,Intel CPU对应指令 cmpxchg。

和直接在链表的头和尾加锁不同,RingBuffer创建一个Sequence对象,指向当前的RingBuffer的头和尾。这头和尾的标识不是通过指针实现,而是通过序号,类名叫Sequence。

RingBuffer中进行生产者和消费者之间的资源协调,是对比序号。 当Pro想往队列加新数据,它会把当前Pro的Sequence的序号,加上需要加入的新数据的数量,然后和实际的消费者所在的位置对比,看队列里是否有足够空间加入这些数据,而不会覆盖消费者还没处理完的数据。

Sequence就是通过CAS,即UNSAFE.compareAndSwapLong:

 public boolean compareAndSet(final long expectedValue, final long newValue)
	    {
	        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
	    }


public long addAndGet(final long increment)
    {
        long currentValue;
        long newValue;


        do
        {
            currentValue = get();
            newValue = currentValue + increment;
        }
        while (!compareAndSet(currentValue, newValue));


        return newValue;

Sequence源码中的addAndGet,若CAS失败,会不断忙等待重试。 CAS不是基础库函数,也不是os实现的一个系统调用,而是一个CPU硬件支持的机器指令。Intel CPU的cmpxchg指令,compxchg [ax] (隐式参数,EAX累加器), [bx] (源操作数地址), [cx] (目标操作数地址):

  • 第一个操作数不在指令里面出现,是一个隐式的操作数,也就是EAX累加寄存器里面的值
  • 第二个操作数就是源操作数,并且指令会对比这个操作数和上面的累加寄存器里面的值

若值相同,CPU会把ZF(条件码寄存器里零标志位的值)置1,再把第三个操作数(即目标操作数)设置到源操作数的地址。

不相等,就会把源操作数里的值,设置到累加器寄存器。

对应伪代码:

IF [ax]< == [bx] THEN [ZF] = 1, [bx] = [cx]
                 ELSE [ZF] = 0, [ax] = [bx] 

单指令是原子的,即CAS时,无需再加锁,直接调用。无锁,CPU就像在赛道上行驶,不会遇到需上下文切换红灯而停下来。虽会遇到CAS这样复杂机器指令,就好像赛道上会有U型弯,不过不用完全停等待,CPU运行起来仍快得多。

3 CAS到底多快

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class LockBenchmark {


    public static void runIncrementAtomic()
    {
        AtomicLong counter = new AtomicLong(0);
        long max = 500000000L;
        long start = System.currentTimeMillis();
        while (counter.incrementAndGet() < max) {
        }
        long end = System.currentTimeMillis();
        System.out.println("Time spent is " + (end-start) + "ms with cas");
    }


    public static void main(String[] args) {
        runIncrementAtomic();
    }
}


Time spent is 3867ms with cas

incrementAndGet最终到CPU指令层面,就是CAS操作。它所花费时间,虽比没任何锁的操作慢一个数量级,但比使用ReentrantLock这样的操作系统锁的机制,还是减少一半时间。

4 总结

Java基础库里面的BlockingQueue,都要通过显示地加锁来保障生产者之间、消费者之间,乃至生产者和消费者之间,不会发生锁冲突的问题。

但加锁会大大拖慢性能。获取锁时,CPU没有执行计算相关指令,而要等待os或JVM进行锁竞争裁决。那些没有拿到锁而被挂起等待的线程,则需上下文切换。这上下文切换,会把挂起线程的寄存器里的数据放到线程的程序栈。即加载到高速缓存里面的数据也失效了,程序就变得更慢。

RingBuffer采用无锁方案,通过CAS进行序号自增和对比,使CPU无需获取os锁。而能继续顺序执行CPU指令。无上下文切换、os锁,程序就快。不过因为采用CAS忙等待(Busy-Wait),会使得CPU始终满负荷运转,消耗更多电,小缺点。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-01-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 缓慢的锁
    • 1.1 链表数据在内存布局对高速缓存不友好
      • 1.2 锁依赖
      • 2 无锁的RingBuffer
      • 3 CAS到底多快
      • 4 总结
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档