前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >LongAdder源码【原创+图解+视频讲解】

LongAdder源码【原创+图解+视频讲解】

作者头像
CaesarChang张旭
发布2023-01-06 09:14:52
3050
发布2023-01-06 09:14:52
举报
文章被收录于专栏:悟道悟道

目录

AtomicLong用法

源码分析

问题

解决

LongAdder用法

高并发下效率测试

原理

源码

add(long x)

Striped64的longAccumulate  

伪共享

总结

视频讲解:


AtomicLong用法

代码语言:javascript
复制
public static void main(String[] args) {
        AtomicLong i = new AtomicLong(0);
​
        System.out.println(i.getAndIncrement());
        System.out.println(i.addAndGet(30));
    }

源码分析

如果value被修改了, CAS失败, 那就获取最新值,继续CAS, 直到成功~

问题

如果并发特别大,修改频繁,那么你要CAS自旋很多次, 效率大大降低

代码语言:javascript
复制
AtomicLong 的 Add 操作是依赖自旋不断的 CAS 去累加一个 Long 值。
如果在竞争激烈的情况下,CAS 操作不断的失败,就会有大量的线程不断的自旋尝试 CAS 会造成 CPU 的极大的消耗

解决

使用LongAdder

LongAdder用法

代码语言:javascript
复制
       LongAdder longAdder = new LongAdder();
        longAdder.increment();
        longAdder.add(10);
        longAdder.sum();

高并发下效率测试

代码语言:javascript
复制
@Slf4j
public class Demo12 {
    public static void main(String[] args) throws Exception {
            long start1 = System.currentTimeMillis();
            testLongAdder();
            System.out.println("LongAdder 耗时:" + (System.currentTimeMillis() - start1) + "ms");
            long start2 = System.currentTimeMillis();
            testAtomicLong();
            System.out.println("AtomicLong 耗时:" + (System.currentTimeMillis() - start2) + "ms");
            System.out.println("----------------------------------------");
        }
​
​
        static void testAtomicLong() throws Exception{
            AtomicLong atomicLong = new AtomicLong();
            List<Thread> list = new ArrayList();
            for (int i = 0; i < 9999; i++) {
                list.add(new Thread(() -> {
                    for(int j=0;j<100000;j++){
                        atomicLong.incrementAndGet();
                    }
                }));
            }
​
            for (Thread thread : list) {
                thread.start();
            }
            for (Thread thread : list) {
                thread.join();
            }
​
​
            System.out.println("AtomicLong value is : " + atomicLong.get());
        }
​
        static void testLongAdder() throws Exception{
            LongAdder longAdder = new LongAdder();
            List<Thread> list = new ArrayList();
            for (int i = 0; i < 9999; i++) {
                list.add(new Thread(() -> {
                    for(int j=0;j<100000;j++){
                        longAdder.increment();
                    }
                }));
            }
​
            for (Thread thread : list) {
                thread.start();
            }
            for (Thread thread : list) {
                thread.join();
            }
​
​
            System.out.println("LongAdder value is : " + longAdder.longValue());
        }
​
}

原理

代码看不懂,请看我讲解视频哈链接:

点我进入

源码

add(long x)

代码语言:javascript
复制
public class LongAdder extends Striped64 implements Serializable {
       public void add(long x) {
        //as 表示cells 引用
        //b  表示 base
        //v 表示 期望值表示获取的base值
        //m 表示cells 数组的长度
        //a 表示当前线程命中的 cell单元格
        Cell[] as; long b, v; int m; Cell a;
​
        //条件一: true-> 表示cells已经初始化过了,当前线程应该将数据写入到对应的cell中
            //  false-> 表示cells 未初始化,当前所有线程应该将数据写入base中
​
        //条件二:true-> 表示当前线程cas替换数据成功
            //false-> 表示发生竞争了,可能需要重试 或者 扩容
            //true 是指casBase(b = base, b + x),不是整体,是部分
            //true时,取反是false
        if ((as = cells) != null || !casBase(b = base, b + x)) {
​
            //true 未竞争      false 发生竞争
            boolean uncontended = true;
​
                //条件一:true-> 说明 cells 未初始化,也就是多线程写base发生竞争
                    // false-> 说明 cells 已经初始化了,当前线程应该是 找自己的cell 写值
            if (as == null || (m = as.length - 1) < 0 ||
​
                //条件二:getProbe() 获取当前线程的hash值, m表示 cells长度-1 cells长度 一定是2的次方数 16-1=15=1111 二进制
                    // true-> 说明当前线程对应下标的cell 为空 ,需要创建longAccumulate 支持
                    // false-> 说明当前线程对应的cell 不为空,说明 下一步想要将x值,添加到cell中
                (a = as[getProbe() & m]) == null ||
​
                //条件三:整体而言,有取反,true-> uncontended为false cas操作失败,意味着当前线程对应的cell 有竞争
                    //false-> 表示cas成功
                !(uncontended = a.cas(v = a.value, v + x)))
​
                    //哪些情况会调用
                    //true-> 说明 cells 未初始化,也就是多线程写base发生竞争
                    //true-> 说明当前线程对应下标的cell 为空 ,需要创建longAccumulate 支持
                    //true-> cas操作失败,意味着当前线程对应的cell 有竞争
                longAccumulate(x, null, uncontended);
        }
    }
}
​

Striped64的longAccumulate  

代码看不懂,请看我讲解视频哈链接:

【总结者】LongAdder源码讲解(图解+代码逐行分析)4K面试必看_哔哩哔哩_bilibili

点我进入

Striped64是一种高并发累加器,有效解决了原子类累加的弊端。Striped64将线程竞争的操作分散开来,每个线程操作一个cell,而sum则等于base和所有cell值的和。

代码语言:javascript
复制
abstract class Striped64 extends Number {
    //1、true-> 说明 cells 未初始化,也就是多线程写base发生竞争
    //2、true-> 说明当前线程对应下标的cell 为空 ,需要创建longAccumulate 支持
    //3、true-> cas操作失败,意味着当前线程对应的cell 有竞争
    //longAccumulate(x, null, uncontended);
​
    //wasUncontended :只有cells 初始化之后,并且当前线程 竞争修改失败,才会是false
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        // h 表示线程hash值
        int h;
        //条件成立:说明当前线程 还未分配线程
        if ((h = getProbe()) == 0) {
            //给当前线程分配hash值
            ThreadLocalRandom.current(); // force initialization
            //取出当前线程的hash值
            h = getProbe();
            // 因为默认情况下 , 肯定是写入到了 cells[0]位置,不把它当做一次真正的竞争
            //因为没有hash值,所有才会在cells[0]竞争,所有表示一次真正意义上的竞争
            wasUncontended = true;
        }
​
        //表示扩容意向  false 一定不会扩容,true 可能会扩容
        boolean collide = false;                // True if last slot nonempty
​
        //自旋
        for (;;) {
            // as 表示cells引用
            // a  表示当前线程命中的cell
            // n  表示cells 数组长度
            // v  表示 期望值
            Cell[] as; Cell a; int n; long v;
​
            // CASE1:表示cells已经初始化了,当前线程应该将数据写入到对应的cell中
            if ((as = cells) != null && (n = as.length) > 0) {
                //下面两种情况才会进入该if
                //2、true-> 说明当前线程对应下标的cell 为空 ,需要创建longAccumulate 支持
                //3、true-> cas操作失败,意味着当前线程对应的cell 有竞争
​
                // CASE1.1: true->表示当前线程 对应的下标未知的cell 为null,需要创建new Cell
                if ((a = as[(n - 1) & h]) == null) {
​
                    //true -> 表示当前 锁未被占用
                    if (cellsBusy == 0) {       // Try to attach new Cell
​
                        //拿当前的x创建Cell
                        Cell r = new Cell(x);   // Optimistically create
​
                        //条件一:true-> 表示当前锁未被占用 false -> 表示锁被占用
                        //条件二:true-> 表示当前线程获取锁成功, false-> 当前线程获取锁失败
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                // rs 表示当前cells引用
                                // m  表示cells长度
                                // j  表示当前线程命中的下标
                                Cell[] rs; int m, j;
​
                                //条件一 条件二  恒成立
                                //rs[j = (m - 1) & h] == null 是为了防止其它线程初始化 该位置,然后当前线程再次初始化该位置
                                //在上面最近的if 时,其它线程可能抢走锁,让 rs[j]已经赋值,当该线程抢回锁之后可以防止重复赋值
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    //扩容意向更改为false      (a = as[(n - 1) & h]) == null
                    //因为当前cell为 null,cells够用,所以不需要扩容
                    collide = false;
                }
                //CASE1.2:
                //只有当前线程的hash值不是0,并且和其它线程一起打在了同一个cell上时,wasUncontended才是false
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
​
                //CASE1.3:当前线程rehash 过 hash值,然后新命中的cell不为空
                //true -> 写成功,退出循环
                //false -> 表示rehash之后命中的cell 也有竞争,重试一次
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //CASE1.4:
                //条件一:n >= NCPU   true-> 扩容意向为false,表示不扩容了  false-> 表示还可以扩容
                //条件二:cells != as   true -> 其它线程已经扩容过了,当前线程rehash
                else if (n >= NCPU || cells != as)
                    //扩容意向:改为false,表示不扩容了
                    collide = false;            // At max size or stale
                //CASE 1.5:
                //collide取反 设置扩容意向 为true,但不一定扩容
                else if (!collide)
                    collide = true;
                //CASE 1.6:真正的扩容
                // 条件一:cellsBusy == 0 当前线程无锁状态,可以去竞争
                // 条件二:casCellsBusy() 当前线程获取锁,true:可以做扩容逻辑,false:表示其它线程正在做扩容相关操作
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        //cells == as 当其它线程抢了线程,先扩容,当前线程抢回后,可以防止再次扩容
                        // 防止重复扩容
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //重置hash值 rehash
                h = advanceProbe(h);
            }
​
            // CASE2:前置条件cells 还未初始化 as 为 null
            // 条件一:cellsBusy 表示未加锁
            // 条件二:cells == as? 因为其他线程可能会在你给as赋值之后 修改了 cells
            // 条件三:true 表示获取锁成功 会把cellsBusy = ,false表示其他线程正在持有锁
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                //1、true-> 说明 cells 未初始化,也就是多线程写base发生竞争
​
                boolean init = false;
                try {                           // Initialize table
                    // cells == as 又要对比,防止再次初始化
                    // 防止其他线程已经初始化了,当前线程再次初始化 导致丢失数据
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            // CASE3:
            // 1、当前cellBusy 加锁状态,表示其它线程正在初始化cells,所有当前线程将值累加到base
            // 2、cells被其他线程初始化后,当前线程需要将数据累加到base
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
}
​

伪共享

上面可以看到 Cell 类被 @sun.misc.Contended 注解了, 是用来避免缓存的伪共享

CPU Cache Line

代码语言:javascript
复制
在计算机的架构中 L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。
    L1缓存很小但很快,并且紧靠着在使用它的CPU内核;
    L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;
    L3更大、更慢,并且被单个插槽上的所有CPU核共享;
    最后是主存,由全部插槽上的所有CPU核共享
代码语言:javascript
复制
Cache 是由很多个cache line(缓存行)组成的。
每个cache line通常是 64 字节,并且它有效地引用主内存中的一块地址。
一个Java的long类型变量是 8 字节,因此在一个缓存行中可以存 8 个long类型的变量
代码语言:javascript
复制
多个线程并发的修改一个缓存行中的不同变量的时候, 
    比如CPU1更新a, CPU2更新b, 但是a和b在同一个缓存行上,每个线程都要去竞争缓存行的所有权来更新变量。 
        如果核心1获得了所有权,缓存子系统将会使核心2中对应的缓存行失效。
        当核心2获得了所有权然后执行更新操作,核心1就要使自己对应的缓存行失效。
     这会来来回回的经过L3缓存,大大影响了性能。

避免伪共享

避免伪共享的情况出现, 就需要让可能出现线程竞争的变量分开到不同的 Cache Line 中, 使用空间换时间的思维

JDK8 有专门的注解 @Contended 来避免伪共享

总结

代码语言:javascript
复制
add(long x):
    如果 cells 数组不为空, 或者 cas 操作 base 失败, 则进入longAccumulate
    
longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended):
    整个 for(;;) 死循环,都是以 cas 操作成功而告终。否则则会修改上述描述的几个标记位,重新进入循环。
      包含几种情况:
        cells 不为空
            如果 cell[i] 某个下标为空,则 new 一个 cell,并初始化值,然后退出
            如果 cas 失败,继续循环
            如果 cell 不为空,且 cell cas 成功,退出
            如果 cell 的数量,大于等于 cpu 数量或者已经扩容了,继续重试。(扩容没意义)
            设置 collide 为 true。
            获取 cellsBusy 成功就对 cell 进行扩容,获取 cellBusy 失败则重新 hash 再重试。
        cells 为空且获取到 cellsBusy ,init cells 数组,然后赋值退出。
        cellsBusy 获取失败,则进行 baseCas ,操作成功退出,不成功则重试。

视频讲解:

代码看不懂,请看我讲解视频哈链接:

【总结者】LongAdder源码讲解(图解+代码逐行分析)4K面试必看_哔哩哔哩_bilibili

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-01-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AtomicLong用法
  • 源码分析
    • 问题
      • 解决
        • LongAdder用法
        • 高并发下效率测试
        • add(long x)
        • Striped64的longAccumulate  
        • 伪共享
    • 原理
    • 源码
    • 总结
    • 视频讲解:
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档