前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅析LongAdder

浅析LongAdder

作者头像
LNAmp
发布2018-09-05 15:26:59
4500
发布2018-09-05 15:26:59
举报

浅析LongAdder

前言

上文中分析了AtomicLong以及Unsafe,本文将为大家带来LongAdder的分析.LongAdder之前在guava以及hystrix等中出现,但是目前已经出现在jdk8标准库中了,作者是著名的Doug lea大师。

基本分析

先看看LongAdder的java doc的描述:

One or more variables that together maintain an initially zero {@code long} sum. When updates (method {@link #add}) are contended across threads, the set of variables may grow dynamically to reduce contention. Method {@link #sum} (or, equivalently, {@link longValue}) returns the current total combined across the variables maintaining the sum. <p> This class is usually preferable to {@link AtomicLong} when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption. <p>This class extends {@link Number}, but does <em>not</em> define methods such as {@code hashCode} and {@code compareTo} because instances are expected to be mutated, and so are not useful as collection keys. jsr166e note: This class is targeted to be placed in java.util.concurrent.atomic

翻译过来就是说: LongAdder中会维护一个或多个变量,这些变量共同组成一个long型的“和”。当多个线程同时更新(特指“add”)值时,为了减少竞争,可能会动态地增加这组变量的数量。“sum”方法(等效于longValue方法)返回这组变量的“和”值。 当我们的场景是为了统计技术,而不是为了更细粒度的同步控制时,并且是在多线程更新的场景时,LongAdder类比AtomicLong更好用。 在小并发的环境下,论更新的效率,两者都差不多。但是高并发的场景下,LongAdder有着明显更高的吞吐量,但是有着更高的空间复杂度。

从上面的java doc来看,LongAdder有两大方法,add和sum。其更适合使用在多线程统计计数的场景下,在这个限定的场景下比AtomicLong要高效一些,下面我们来分析下为啥在这种场景下LongAdder会更高效。

add方法

代码语言:javascript
复制
public void add(long x) {
        Cell[] as; long b, v; HashCode hc; Cell a; int n;
        //首先判断cells是否还没被初始化,并且尝试对value值进行cas操作
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            //查看当前线程中HashCode中存储的随机值
            int h = (hc = threadHashCode.get()).code;
            //此处有多个判断条件,依次是
            //1.cell[]数组还未初始化
            //2.cell[]数组虽然初始化了但是数组长度为0
            //3.该线程所对应的cell为null,其中要注意的是,当n为2的n次幂时,((n - 1) & h)等效于h%n
            //4.尝试对该线程对应的cell单元进行cas更新(加上x)
            if (as == null || (n = as.length) < 1 ||
                (a = as[(n - 1) & h]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                //在以上条件都失效的情况下,重试update
                retryUpdate(x, hc, uncontended);
        }
    }

    //一个ThreadLocal类
    static final class ThreadHashCode extends ThreadLocal<HashCode> {
        public HashCode initialValue() { return new HashCode(); }
    }

    
    static final ThreadHashCode threadHashCode = new ThreadHashCode();


    //每个HashCode在初始化时会产生并保存一个非0的随机数
    static final class HashCode {
        static final Random rng = new Random();
        int code;
        HashCode() {
            int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
            code = (h == 0) ? 1 : h;
        }
    }
    
    //尝试使用casBase对value值进行update,baseOffset是value相对于LongAdder对象初始位置的内存偏移量
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val);
    }

add方法的注释在其中,让我们再看看重要的retryUpdate方法。retryUpdate在上述四个条件都失败的情况下尝试再次update,我们猜测在四个条件都失败的情况下在retryUpdate中肯定都对应四个条件失败的处理方法,并且update一定要成功,所以肯定有相应的循环+cas的方式出现。

代码语言:javascript
复制
final void retryUpdate(long x, HashCode hc, boolean wasUncontended) {
        int h = hc.code;
        boolean collide = false;                // True if last slot nonempty
        //我们猜测的for循环
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //这个if分支处理上述四个条件中的3和4,此时cells数组已经初始化了并且长度大于0
            if ((as = cells) != null && (n = as.length) > 0) {
                //该分支处理四个条件中的3分支,线程对应的cell为null
                if ((a = as[(n - 1) & h]) == null) {
                    //如果busy锁没被占有
                    if (busy == 0) {            // Try to attach new Cell
                        //新建一个cell
                        Cell r = new Cell(x);   // Optimistically create
                        //double check busy,并且尝试锁busy(乐观锁)
                        if (busy == 0 && casBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    //再次确认线程hashcode所对应的cell为null,将新建的cell赋值
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                            //解锁
                                busy = 0;
                            }
                            if (created)
                                break;
                            //如果失败,再次尝试
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //处理四个条件中的条件4,置为true后交给循环重试
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //尝试给线程对应的cell update
                else if (a.cas(v = a.value, fn(v, x)))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                //在以上办法都不管用的情况下尝试扩大cell
                else if (busy == 0 && casBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                        //扩大一倍,将前N个拷贝过去
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        busy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //rehash下,走到这一步基本是因为多个线程的竞争太激烈了,所以在扩展cell后rehash h,等待下次循环处理好这次更新
                h ^= h << 13;                   // Rehash
                h ^= h >>> 17;
                h ^= h << 5;
            }
            //主要针对上述四个条件中的1.2,此时cells还未进行第一次初始化,其中casBusy的理解参照下面busy的      注释,如果casBusy能成功才进入这个分支
            else if (busy == 0 && cells == as && casBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        //创建数量为2的cell数组,2很重要,因为每次都是n<<1进行扩大一倍的,所以n永远是2的幂
                        Cell[] rs = new Cell[2];
                        //需要注意的是h&1 = h%2,将线程对应的cell初始值设置为x
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                //释放busy锁
                    busy = 0;
                }
                if (init)
                    break;
            }
            //busy锁不成功或者忙,则再重试一次casBase对value直接累加
            else if (casBase(v = base, fn(v, x)))
                break;                          // Fall back on using base
        }
        hc.code = h;                            // Record index for next time
    }
    
    /**
     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
     通过cas实现的自旋锁,用于扩大或者初始化cells
     */
    transient volatile int busy;

从以上分析来看,retryUpdate非常的复杂,所做的努力就是为了尽量减少多个线程更新同一个值value,能用简单的方式解决的绝对不采用开销更大的方法(resize cell也是走投无路的时候)

回过头来总结分析下LongAdder减少冲突的方法以及在求和场景下比AtomicLong更高效的原因

  • 首先和AtomicLong一样,都会先采用cas方式更新值
  • 在初次cas方式失败的情况下(通常证明多个线程同时想更新这个值),尝试将这个值分隔成多个cell(sum的时候求和就好),让这些竞争的线程只管更新自己所属的cell(因为在rehash之前,每个线程中存储的hashcode不会变,所以每次都应该会找到同一个cell),这样就将竞争压力分散了

sum方法

代码语言:javascript
复制
public long sum() {
        long sum = base;
        Cell[] as = cells;
        if (as != null) {
            int n = as.length;
            for (int i = 0; i < n; ++i) {
                Cell a = as[i];
                if (a != null)
                    sum += a.value;
            }
        }
        return sum;
    }

sum方法就简单多了,将cell数组中的value求和就好

AtomicLong可否可以被LongAdder替代

有了传说中更高效的LongAdder,那AtomicLong可否不使用了呢?当然不是!

答案就在LongAdder的java doc中,从我们翻译的那段可以看出,LongAdder适合的场景是统计求和计数的场景,而且LongAdder基本只提供了add方法,而AtomicLong还具有cas方法(要使用cas,在不直接使用unsafe之外只能借助AtomicXXX了)

LongAdder有啥用

从java doc中可以看出,其适用于统计计数的场景,例如计算qps这种场景。在高并发场景下,qps这个值会被多个线程频繁更新的,所以LongAdder很适合。HystrixRollingNumber就是用了它,下篇文章介绍它

总结

本文简单分析了下LongAdder,下篇文章介绍HystrixRollingNumber

留个悬念

代码语言:javascript
复制
  static final class Cell {
        volatile long p0, p1, p2, p3, p4, p5, p6;
        volatile long value;
        volatile long q0, q1, q2, q3, q4, q5, q6;
        Cell(long x) { value = x; }

        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }

    }

Cell单元为啥要这么设计?volatile long p0, p1, p2, p3, p4, p5, p6; volatile long q0, q1, q2, q3, q4, q5, q6;这些看起来没用的p,q可以删掉吗?

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016.10.27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 基本分析
    • add方法
      • sum方法
        • AtomicLong可否可以被LongAdder替代
          • LongAdder有啥用
          • 总结
          • 留个悬念
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档