前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >高性能原子类

高性能原子类

作者头像
黑洞代码
发布2021-01-14 15:16:36
6010
发布2021-01-14 15:16:36
举报

高性能原子类的使用

代码语言:javascript
复制
/**
 * @Author: 无双老师【云析学院:http://yunxiedu.net QQ:3190976240 email:zhouguanya20@163.com】
 * @Date: 2020-04-05 16:28
 * @Description: LongAdder使用姿势
 */
public class LongAdderDemo {
    public static void main(String[] args) {
        LongAdder longAdder = new LongAdder();
        // 自增1
        longAdder.increment();
        // 加666
        longAdder.add(666);
        // 打印总和
        System.out.println(longAdder.sum());

        // 创建LongAccumulator,基数为1
        LongAccumulator longAccumulator
                = new LongAccumulator((left, right) -> left + right * 2, 1);
        // 1 + 1 * 2
        longAccumulator.accumulate(1);
        System.out.println(longAccumulator.get());
        // 1 + 1 * 2 + 3 * 2
        longAccumulator.accumulate(3);
        System.out.println(longAccumulator.get());
        // 1 + 1 * 2 + 3 * 2 + -4 * 2
        longAccumulator.accumulate(-4);
        System.out.println(longAccumulator.get());
    }
}

类图

公共父类Striped64是实现中的核心,它实现一些核心操作,处理64位数据,很容易就能转化为其他基本类型,是个通用的类。二元算术运算,指的是你可以给它提供一个二元算术方式,这个类按照你提供的方式进行算术计算,并保存计算结果。二元运算中第一个操作数是累积器中某个计数单元当前的值,另外一个值是外部提供的。

举几个栗子:假设每次操作都需要把原来的数值加上某个值,那么二元运算为 (x, y) -> x+y,这样累积器每次都会加上你提供的数字y,这跟LongAdder的功能基本上是一样的;

假设每次操作都需要把原来的数值变为它的某个倍数,那么可以指定二元运算为 (x, y) -> x * y,累积器每次都会乘以你提供的数字y,y=2时就是通常所说的每次都翻一倍;

假设每次操作都需要把原来的数值变成它的5倍,再加上3,再除以2,再减去4,再乘以你给定的数,最后还要加上6,那么二元运算为 (x, y) -> ((x * 5+3)/2 - 4) * y +6,累积器每次累积操作都会按照你说的做;......

LongAccumulator是标准的实现类,LongAdder是特化的实现类,它的功能等价于LongAccumulator((x, y) -> x+y, 0L)。它们的区别很简单,前者可以进行任何二元算术操作,后者只能进行加减两种算术操作。

Double版本是Long版本的简单改装,相对Long版本,主要的变化就是用Double.longBitsToDouble 和Double.doubleToRawLongBits对底层的8字节数据进行long <---> double转换,存储的时候使用long型,计算的时候转化为double型。这是因为CAS是sun.misc.Unsafe中提供的操作,只对int、long、对象类型(引用或者指针)提供了这种操作,其他类型都需要转化为这三种类型才能进行CAS操作。这里的long型也可以认为是8字节的原始类型,因为把它视为long类型是无意义的。java中没有C语言中的 void* 无类型(或者叫原始类型),只能用最接近的long类型来代替。

Striped64源码

四个类的核心实现都在Striped64中,这个类使用分段的思想,来尽量平摊并发压力。类似1.7版本的ConcurrentHashMap.Segment,Striped64中使用了一个叫Cell的类,是一个普通的二元算术累积单元,线程也是通过hash取模操作映射到一个Cell上进行累积。为了加快取模运算效率,也把Cell数组的大小设置为2^n,同时大量使用Unsafe提供的底层操作。基本的实现桶1.7的ConcurrentHashMap非常像,而且更简单。

累积单元Cell

代码语言:javascript
复制
// 很简单的一个类,这个类可以看成是一个简化的AtomicLong
// 通过cas操作来更新value的值
// @sun.misc.Contended是一个高端的注解,代表使用缓存行填来避免伪共享
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
 
    // Unsafe mechanics Unsafe相关的初始化
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Striped64主体代码

代码语言:javascript
复制
abstract class Striped64 extends Number {
    @sun.misc.Contended static final class Cell { ... }
 
    /** Number of CPUS, to place bound on table size */
    static final int NCPU = Runtime.getRuntime().availableProcessors();
 
    // cell数组,长度2^n,可以类比为jdk1.7的ConcurrentHashMap中的segments数组
    transient volatile Cell[] cells;
 
    // 累积器的基本值,在两种情况下会使用:
    // 1、没有遇到并发的情况,直接使用base,速度更快;
    // 2、多线程并发初始化table数组时,必须要保证table数组只被初始化一次,因此只有一个线程能够竞争成功,
	// 这种情况下竞争失败的线程会尝试在base上进行一次累积操作
    transient volatile long base;
 
    // 自旋标识,在对cells进行初始化,或者扩容时,需要通过CAS操作把此标识设置为1,忙标识,相当于加锁
	// 取消busy时可以直接使用cellsBusy = 0,相当于释放锁
    transient volatile int cellsBusy;
 
    Striped64() {
    }
 
    // 使用CAS更新base的值
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
 
    // 使用CAS将cells自旋标识更新为1
    // 更新为0时可以不用CAS,直接使用cellsBusy就行
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
 
    // 下面这两个方法是ThreadLocalRandom中的方法
    // probe翻译过来是探测/探测器/探针这些,它是ThreadLocalRandom里面的一个属性,
    // 这里可以把它理解为线程本身的hash值
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }
 
    // 相当于rehash,重新算一遍线程的hash值
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }
 
    /**
     * 核心方法的实现
     * @param x the value 外部提供的那个操作数
     * @param fn
     *     外部提供的二元算术操作,实例持有并且只能有一个,生命周期内保持不变,
     *     null代表LongAdder这种特殊但是最常用的情况,可以减少一次方法调用
     * @param wasUncontended 表明调用者预先调用的一次CAS操作都失败了
     */
    final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
        int h;
        // 这个if相当于给线程生成一个非0的hash值
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false; // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
			 // cells已经被初始化了
            if ((as = cells) != null && (n = as.length) > 0) {
				// hash取模映射得到的Cell单元还为null(为null表示还没有被使用)
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell 如果没有线程正在执行扩容
                        Cell r = new Cell(x);   // Optimistically create 先创建新的累积单元
                        if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁
                            boolean created = false;
                            try {               // Recheck under lock 在有锁的情况下再检测一遍之前的判断
                                Cell[] rs; int m, j;
								// 考虑别的线程可能执行了扩容,这里重新赋值重新判断
                                if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
								    // 对没有使用的Cell单元进行累积操作
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0; //清空自旋标识,释放锁
                            }
							// 如果原本为null的Cell单元是由自己进行第一次累积操作,
							// 那么任务已经完成了,所以可以退出循环
                            if (created)
                                break;
                            continue;           // Slot is now non-empty 不是自己进行第一次累积操作,重头再来
                        }
                    }
					// cellsBusy=1,cells被加锁了,不能往下继续执行
                    collide = false;
                }
				// 前面一次CAS更新a.value(进行一次累积)的尝试已经失败了,说明已经发生了线程竞争
                else if (!wasUncontended) // CAS already known to fail
				    // 情况失败标识,后面去重新算一遍线程的hash值
                    wasUncontended = true; // Continue after rehash
				// 尝试CAS更新a.value(进行一次累积) ------ 标记为【分支A】
                else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
					// 成功了就完成了累积任务,退出循环
                    break;
				// cell数组已经达最大,或者发生了扩容操作。因为NCPU不一定是2^n,所以这里用 >=
                else if (n >= NCPU || cells != as)
					//长度n是递增的,执行到了这个分支,说明n >= NCPU会永远为true,
					//下面两个else if就永远不会被执行了,也就永远不会再进行扩容
                    collide = false; // At max size or stale
                // CPU能够并行的CAS操作的最大数量是它的核心数
				//(CAS在x86中对应的指令是cmpxchg,多核需要通过锁缓存来保证整体原子性),
				//当n >= NCPU时,再出现几个线程映射到同一个Cell导致CAS竞争的情况,
				//那就真不关扩容的事了,完全是hash值的锅了
				
				// 映射到的Cell单元不是null,并且尝试对它进行累积时,
				//CAS竞争失败了,这时候把扩容意向设置为true
                else if (!collide)
                     // 下一次循环如果还是跟这一次一样,说明竞争很严重,那么就真正扩容
					  // 把扩容意向设置为true,只有这里才会给collide赋值为true,
					  // 也只有执行了这一句,才可能执行后面一个else if进行扩容
                    collide = true;
				// 最后再考虑扩容,能到这一步说明竞争很激烈,尝试加锁进行扩容 ------ 标记为【分支B】
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
						// 检查下是否被别的线程扩容了(CAS更新锁标识,处理不了ABA问题,这里再检查一遍)
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1]; // 执行2倍扩容
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0; // 释放锁
                    }
                    collide = false; // 扩容意向为false
                    continue; // Retry with expanded table 扩容后重头再来
                }
				// 重新给线程生成一个hash值,降低hash冲突,减少映射到同一个Cell导致CAS竞争的情况
                h = advanceProbe(h);
            }
			// cells没有被加锁,并且它没有被初始化,那么就尝试对它进行加锁,加锁成功进入这个else if
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
					// CAS避免不了ABA问题,这里再检测一次,如果还是null,或者空数组,那么就执行初始化
                    if (cells == as) {
						// 初始化时只创建两个单元
                        Cell[] rs = new Cell[2];
						// 对其中一个单元进行累积操作,另一个不管,继续为null
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
					// 清空自旋标识,释放锁
                    cellsBusy = 0;
                }
				 // 如果某个原本为null的Cell单元是由自己进行第一次累积操作,
				 // 那么任务已经完成了,所以可以退出循环
                if (init)
                    break;
            }
			// cells正在进行初始化时,尝试直接在base上进行累加操作
            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
				直接在base上进行累积操作成功了,任务完成,可以退出循环了
                break;                          // Fall back on using base
        }
    }
 
    // double跟long的逻辑基本上是一样的
    final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended);
 
    // Unsafe mechanics Unsafe初始化
    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
}

分支A是用CAS更新对应的cell.value,是个写操作,分支B是进行扩容。ConcurrentHashMap中,扩容和写操作是会严格处理的,在一个分段锁管辖区内,不会出现扩容和写操作并发:1.7的扩容操作都是在put内部执行的,put本身就会加锁,因此扩容进行时会阻塞对同一个Segment的写操作;1.8中扩容时,put/remove等方法如果碰见正在其他线程正在执行扩容,会去帮助扩容,扩容完成了之后才会去尝试加锁执行真正的写操作。

虽然B分支会进行”加锁“,但是A操作跟cellsBusy无关,”加锁“并不禁止A操作的执行。AB两个分支是不互斥的, 因此Striped64这里会出现A分支的写操作,和B分支扩容操作并发执行的情况。

那么问题是:为什么这么并发执行没问题?仔细看看A操作,就明白了。A操作使用CAS更新Cell对象中的某个属性,并不改变数组持有的Cell对象的引用。B操作进行的是数组持有的Cell对象引用的复制,复制后引用指向的还是原来的那个Cell对象。

举个例子就是,旧的cell数组,叫作old,old[1] = cellA,cellA.value = 1,扩容后的新数组,叫作new,仍然有new[1] = cellA。A分支实际上执行的是cellA.value = 2,无论分支A和B怎么并发执行,执行完成后新数组都能看到分支A对Cell的改变,扩容前后实际上数组持有的是同一群Cell对象。

这下就知道为什么不直接用long变量代替Cell对象了吧。long[]进行复制时,两个数组完完全全分离了,A分支直接作用在旧数组上,B分支扩容后,看不到串行复制执行后对旧数组同一位置的改变。举个例子就是,old[1]=10,A分支要把old[1]更新为11,这时候B分支已经复制到old[5]了,A分支执行完成后,B分支创建的新数组new[1]可能还是10(不管是多少,反正没记录A分支的操作),这样A分支的操作就被遗失了,程序会有问题。下面简单画了个示意图,可以看看。

LongAccumulator原理

代码语言:javascript
复制
/**
 * Updates with the given value.
 *
 * @param x the value
 */
public void accumulate(long x) {
	Cell[] as; long b, v, r; int m; Cell a;
	if ((as = cells) != null ||
		(r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
		boolean uncontended = true;
		if (as == null || (m = as.length - 1) < 0 ||
			(a = as[getProbe() & m]) == null ||
			!(uncontended =
			  (r = function.applyAsLong(v = a.value, x)) == v ||
			  a.cas(v, r)))
			longAccumulate(x, function, uncontended);
	}
}

accumulate方法主要是通过longAccumulate方法实现累加。

代码语言:javascript
复制
/**
 * Returns the current value.  The returned value is <em>NOT</em>
 * an atomic snapshot; invocation in the absence of concurrent
 * updates returns an accurate result, but concurrent updates that
 * occur while the value is being calculated might not be
 * incorporated.
 *
 * @return the current value
 */
public long get() {
	Cell[] as = cells; Cell a;
	long result = base;
	if (as != null) {
		for (int i = 0; i < as.length; ++i) {
			if ((a = as[i]) != null)
				result = function.applyAsLong(result, a.value);
		}
	}
	return result;
}

获取最终结果的时候,其实是for循环遍历cell数组,然后将cell数组每个位置上的元素都进行一次汇总。

LongAdder原理

LongAdder类是JDK1.8新增的一个原子性操作类。AtomicLong通过CAS算法提供了非阻塞的原子性操作,相比受用阻塞算法的同步器来说性能已经很好了,但是JDK开发组并不满足于此,因为非常高并发的请求下AtomicLong的性能是不能让人接受的。

如下AtomicLong 的incrementAndGet的代码,虽然AtomicLong使用CAS算法,但是CAS失败后还是通过无限循环的自旋锁不停的尝试,这就是高并发下CAS性能低下的原因所在。源码如下:

代码语言:javascript
复制
public final long incrementAndGet() {
	for (;;) {
		long current = get();
		long next = current + 1;
		if (compareAndSet(current, next))
			return next;
	}
}

在高并发下N个线程同时去操作一个变量会造成大量线程CAS失败,然后处于自旋状态,这样导致大大浪费CPU资源,降低了并发性。

既然AtomicLong性能问题是由于过多线程同时去竞争同一个变量的更新而降低的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源,那么性能问题不久迎刃而解了吗?

没错,因此,JDK8 提供的LongAdder就是这个思路。下面通过图形来标示两者的不同,如下图:

上图是多个线程同时竞争同一个AtomicLong变量的情景。

如上图所示,LongAdder则是内部维护多个Cell变量,每个Cell里面有一个初始值为0的long型变量,在同等并发量的情况下,争夺单个变量的线程会减少,这是变相的减少了争夺共享资源的并发量,另外多个线程在争夺同一个原子变量时候,如果失败并不是自旋CAS重试,而是尝试获取其他原子变量的锁,最后当获取当前值时候是把所有变量的值累加后再加上base的值返回的。

代码语言:javascript
复制
public class LongAdder extends Striped64 implements Serializable {
 
    // 构造方法,什么也不做,直接使用默认值,base = 0, cells = null
    public LongAdder() {
    }
 
    // add方法,根据父类的longAccumulate方法的要求,这里要进行一次CAS操作
    // (虽然这里有两个CAS,但是第一个CAS成功了就不会执行第二个,要执行第二个,第一个就被“短路”了不会被执行)
    // 在线程竞争不激烈时,这样做更快
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
 
    public void increment() {
        add(1L);
    }
 
    public void decrement() {
        add(-1L);
    }
 
    // 返回累加的和,也就是“当前时刻”的计数值
    // 此返回值可能不是绝对准确的,因为调用这个方法时还有其他线程可能正在进行计数累加,
    //     方法的返回时刻和调用时刻不是同一个点,在有并发的情况下,这个值只是近似准确的计数值
    // 高并发时,除非全局加锁,否则得不到程序运行中某个时刻绝对准确的值,但是全局加锁在高并发情况下是下下策
    // 在很多的并发场景中,计数操作并不是核心,这种情况下允许计数器的值出现一点偏差,此时可以使用LongAdder
    // 在必须依赖准确计数值的场景中,应该自己处理而不是使用通用的类
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
 
    // 重置计数器,只应该在明确没有并发的情况下调用,可以用来避免重新new一个LongAdder
    public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }
 
    // 相当于sum()后再调用reset()
    public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }
 
    // 其他的不说了
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 落叶飞翔的蜗牛 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 高性能原子类的使用
  • 类图
  • Striped64源码
    • 累积单元Cell
      • Striped64主体代码
      • LongAccumulator原理
      • LongAdder原理
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档