前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >并发编程原理剖析——Java并发包中原子操作类原理剖析

并发编程原理剖析——Java并发包中原子操作类原理剖析

作者头像
须臾之余
发布2019-07-10 10:13:59
5080
发布2019-07-10 10:13:59
举报
文章被收录于专栏:须臾之余须臾之余

原子变量操作类

JUC并发包中包含有AtomicInteger、AtomicLong和AtomicBoolean等原子性操作类。他们的原理类似,下面开始分析这些原子操作类的实现原理。

代码语言:javascript
复制
public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;

    // setup to use Unsafe.compareAndSwapLong for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();//(1)获取Unsafe实例
    private static final long valueOffset;//(2)存放value的偏移量

    /**
     * Records whether the underlying JVM supports lockless
     * compareAndSwap for longs. While the Unsafe.compareAndSwapLong
     * method works in either case, some constructions should be
     * handled at Java level to avoid locking user-visible locks.
     */
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();//(3)判断JVM是否支持Long类型无锁CAS

    /**
     * Returns whether underlying JVM supports lockless CompareAndSet
     * for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.
     */
    private static native boolean VMSupportsCS8();

    static {
        try {
            valueOffset = unsafe.objectFieldOffset//(4)获取value在AtomicLong中的偏移量
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile long value;//(5)实际变量值

    /**
     * Creates a new AtomicLong with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicLong(long initialValue) {
        value = initialValue;
    }

代码(1)通过Unsafe。getUnsafe()方法获取到Unsafe类的实例,因为AtomicLong类也是在rt.jar包下的,AtomicLong类就是通过BootStrap类加载器加载的。

代码(5)中的value被声明为volatile的,是保证在多线程环境下内存的可见性。value是具体存放计数器的变量。

代码(2)(4)获取value变量在AtomicLong类中的偏移量。

下面重点看下AtomincLong中的主要函数。

1.递增和递减操作代码

代码语言:javascript
复制
/**
 * Atomically increments by one the current value.
 *
 * @return the updated value
 */
//(6)调用unsafe方法,原子性设置value值为原始值+1,返回指为递增之后的值
public final long incrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
代码语言:javascript
复制
/**
 * Atomically decrements by one the current value.
 *
 * @return the updated value
 */
//(7)调用unsafe方法,原子性操作设置为value值为原始值-1,返回指为递减之后的值
public final long decrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}
代码语言:javascript
复制
/**
 * Atomically increments by one the current value.
 *
 * @return the previous value
 */
//(8)调用unsafe方法,原子性设置value值为原始值+1,返回值为原始值
public final long getAndIncrement() {
    return unsafe.getAndAddLong(this, valueOffset, 1L);
}
代码语言:javascript
复制
/**
 * Atomically decrements by one the current value.
 *
 * @return the previous value
 */
//调用unsafe方法,原子性设置value值为原始值-1,返回值为原始值
public final long getAndDecrement() {
    return unsafe.getAndAddLong(this, valueOffset, -1L);
}

在如上代码内部都是调用Unsafe的getAndAddLong方法来实现操作的,这个函数是个原子性操作,这里第一个参数是AtomicLong实例的引用,第二个参数是value变量在AtomicLong中的偏移量,第三个参数是要设置的第二个变量的值。

其中,getAndIncremnet方法在JDK7中的实现逻辑为

public final long getAndIncrement(){ while(true){ long current=get(); long next=current+1; if(compareAndSet(current,next)) return current; } }

在如上代码中,每个线程是先拿到变量的当前值(由于value是volatile变量,所以这了拿到的是最新的值),然后在工作内存中对其进行增1,而后使用CAS修改变量的值,如果设置失败,则循环尝试,直到设置成功。

而JDK8的逻辑为

代码语言:javascript
复制
/**
 * Atomically increments by one the current value.
 *
 * @return the previous value
 */
public final long getAndIncrement() {
    return unsafe.getAndAddLong(this, valueOffset, 1L);
}
代码语言:javascript
复制
public final long getAndAddLong(Object var1, long var2, long var4) {
    long var6;
    do {
        var6 = this.getLongVolatile(var1, var2);
    } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

    return var6;
}

可以看到,JDK7的AtomicLong中的循环逻辑已经被JDK8中的原子操作类UNsafe内置了,之所以内置应该是考虑这个函数在其它地方也可能用到,而内置可以提高复用性。

代码语言:javascript
复制
/**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(long expect, long update) {
    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}如上代码可知,在内部还是调用了unsafe.compareAndSwapLong方法。如果原子变量中的value值等于expect,则使用update值更新该值并返回true,否则返回false。

下面通过一个多线程使用AtomicLong统计0个数的例子来理解

代码语言:javascript
复制
public class Atomic {
    //创建Long型原子计数器
    private static AtomicLong atomicLong=new AtomicLong();
    //创建数据源
    private static Integer[] arrayOne=new Integer[]{0,1,2,3,0,5,6,0,56,0};
    private static Integer[] arrayTwo=new Integer[]{0,1,2,3,0,5,6,0,56,0};

    public static void main(String[] args) throws InterruptedException {
        //线程One去统计数组arrayOne中0的个数
        Thread threadOne = new Thread(new Runnable() {
            @Override
            public void run() {
                int size = arrayOne.length;
                for (int i = 0; i < size; ++i) {
                    if (arrayOne[i].intValue() == 0) {
                        atomicLong.incrementAndGet();
                    }
                }
            }
        });
        //线程Two去统计数组arrayOne中0的个数
        Thread threadTwo = new Thread(new Runnable() {
            @Override
            public void run() {
                int size = arrayTwo.length;
                for (int i = 0; i < size; ++i) {
                    if (arrayTwo[i].intValue() == 0) {
                        atomicLong.incrementAndGet();
                    }
                }
            }
        });
        threadOne.start();
        threadTwo.start();

        threadOne.join();
        threadTwo.join();

        long end=System.nanoTime();
        System.out.println("count0:"+atomicLong.get());

    }
}

如上代码中的两个线程各自统计自己所持数据中0的个数,每找到一个0就会调用AtomicLong的原子递增方法。

在没有原子类的情况下,实际计数器需要使用一定的同步措施,比如使用synchronized关键字。但是这些都是阻塞算法,对性能有一定的损耗,而本章介绍的这些原子操作类都使用CAS非阻塞式算法,性能更好。但是在高并发情况下AtomicLong会存在性能问题。JDK8提供了一个在高并发下性能更好的LongAddr类。下面开始讲解下这个类。

JDK8新增的原子操作类LongAddr

引入原因:AtomicLong通过CAS提供了非阻塞式算法的原子性操作,相比使用阻塞式算法的同步器来说它的性能已经算很?了,但是JDK开发组的人并不满足。使用AtomicLong时,在高并发下大量线程会同时去竞争同一个原子变量,由于同时只有一个线程的CAS操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS操作,而这会白白浪费CPU资源。

因此JDK8新增了一个原子性递增或者递减类LongAddr用来克服在高并发下使用AtomicLong的缺点。既然AtomicLong的性能瓶颈是由于过多线程同时去竞争一个变量的更新而产生的,那么把一个变量分解为多个变量,让同样多大额线程去竞争多个资源,是不是就解决了性能问题?是的,LongAddr就是这个思路。

下面比较两者设计不同之处

如上图所示,LongAdder则是内部维护多个Cell变量,每个Cell里面有一个初始值为0的long型变量,在同等并发量的情况下,争夺单个变量的线程会减少,这是变相的减少了争夺共享资源的并发量,另外多个线程在争夺同一个原子变量时候, 如果失败并不是自旋CAS重试,而是尝试获取其他原子变量的锁,最后当获取当前值时候是把所有变量的值累加后再加上base的值返回的。 LongAdder维护了要给延迟初始化的原子性更新数组和一个基值变量base数组的大小保持是2的N次方大小,数组表的下标使用每个线程的hashcode值的掩码表示,数组里面的变量实体是Cell类型。 Cell 类型是Atomic的一个改进,用来减少缓存的争用,对于大多数原子操作字节填充是浪费的,因为原子操作都是无规律的分散在内存中进行的,多个原子性操作彼此之间是没有接触的,但是原子性数组元素彼此相邻存放将能经常共享缓存行,也就是伪共享。所以这在性能上是一个提升。 另外由于Cells占用内存是相对比较大的,所以一开始并不创建,而是在需要时候再创建,也就是惰性加载,当一开始没有空间时候,所有的更新都是操作base变量。

接下来进行LongAdder代码简单分析

这里我只是简单的介绍一下代码的实现,详细实现,大家可以翻看代码去研究。为了降低高并发下多线程对一个变量CAS争夺失败后大量线程会自旋而造成降低并发性能问题,LongAdder内部通过根据并发请求量来维护多个Cell元素(一个动态的Cell数组)来分担对单个变量进行争夺资源。

首先我们先看LongAdder的构造类图,如下图:

可以看到LongAdder继承自Striped64类,Striped64内部维护着三个变量,LongAdder的真实值其实就是base的值与Cell数组里面所有Cell元素值的累加,base是个基础值,默认是0,cellBusy用来实现自旋锁,当创建Cell元素或者扩容Cell数组时候用来进行线程间的同步。

接下来进去源码如看Cell的构造,源码如下:

代码语言:javascript
复制
/**
 * Padded variant of AtomicLong supporting only raw accesses plus CAS.
 *
 * JVM intrinsics note: It would be possible to use a release-only
 * form of CAS here, if it were provided.
 */
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

正如上面的代码可以知道Cell的构造很简单,内部维护一个声明volatile的变量,这里声明为volatile是因为线程操作value变量时候没有使用锁,为了保证变量的内存可见性这里只有声明为volatile。另外这里就是先前文件所说的使用Unsafe类的方法来设置value的值

接下来进入LongAdder的源码里面去看几个重要的方法,如下:

1.long sum() 方法:返回当前的值,内部操作是累加所有 Cell 内部的 value 的值后累加 base,如下代码,由于计算总和时候没有对 Cell 数组进行加锁,所以在累加过程中可能有其它线程对 Cell 中的值进行了修改,也有可能数组进行了扩容,所以 sum 返回的值并不是非常精确的,

返回值并不是一个调用 sum 方法时候的一个原子快照值。

源码如下:

代码语言:javascript
复制
/**
 * Returns the current sum.  The returned value is <em>NOT</em> an
 * atomic snapshot; invocation in the absence of concurrent
 * updates returns an accurate result, but concurrent updates that
 * occur while the sum is being calculated might not be
 * incorporated.
 *
 * @return the sum
 */
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

2.void reset() 方法:重置操作,如下代码把 base 置为 0,如果 Cell 数组有元素,则元素值重置为 0。源码如下:

代码语言:javascript
复制
/**
 * Resets variables maintaining the sum to zero.  This method may
 * be a useful alternative to creating a new adder, but is only
 * effective if there are no concurrent updates.  Because this
 * method is intrinsically racy, it should only be used when it is
 * known that no threads are concurrently updating.
 */
public void reset() {
    Cell[] as = cells; Cell a;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                a.value = 0L;
        }
    }
}

3.long sumThenReset() 方法:是sum 的改造版本,如下代码,在计算 sum 累加对应的 cell 值后,把当前 cell 的值重置为 0,base 重置为 0。 当多线程调用该方法时候会有问题,比如考虑第一个调用线程会清空 Cell 的值,后一个线程调用时候累加时候累加的都是 0 值。

源码如下:

代码语言:javascript
复制
/**
 * Equivalent in effect to {@link #sum} followed by {@link
 * #reset}. This method may apply for example during quiescent
 * points between multithreaded computations.  If there are
 * updates concurrent with this method, the returned value is
 * <em>not</em> guaranteed to be the final value occurring before
 * the reset.
 *
 * @return the sum
 */
public long sumThenReset() {
    Cell[] as = cells; Cell a;
    long sum = base;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null) {
                sum += a.value;
                a.value = 0L;
            }
        }
    }
    return sum;
}

 4.long longValue() 等价于 sum(),源码如下:

代码语言:javascript
复制
public long longValue() {
    return sum();
}

 5.void add(long x) 累加增量 x 到原子变量,这个过程是原子性的。源码如下:

代码语言:javascript
复制
public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {//(1)
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||//(2)
                (a = as[getProbe() & m]) == null ||//(3)
                !(uncontended = a.cas(v = a.value, v + x)))//(4)
                longAccumulate(x, null, uncontended);//(5)
        }
    }

    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

可以看到上面代码,当第一个线程A执行add时候,代码(1)会执行casBase方法,通过CAS设置base为 X, 如果成功则直接返回,这时候base的值为1。

假如多个线程同时执行add时候,同时执行到casBase则只有一个线程A成功返回,其他线程由于CAS失败执行代码(2),代码(2)是获取cells数组的长度,如果数组长度为0,则执行代码(5),否则cells长度不为0,说明cells数组有元素则执行代码(3),

代码(3)首先计算当前线程在数组中下标,然后获取当前线程对应的cell值,如果获取到则执行(4)进行CAS操作,CAS失败则执行代码(5)。

代码(5)里面是具体进行数组扩充和初始化,这个代码比较复杂,这里就不讲解了,有兴趣的可以进去看看。

LongAccumulator类源码分析

LongAdder类是LongAccumulator的一个特例,LongAccumulator提供了比LongAdder更强大的功能,如下构造函数,其中accumulatorFunction是一个双目运算器接口,根据输入的两个参数返回一个计算值,identity则是LongAccumulator累加器的初始值。

public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity) { this.function = accumulatorFunction; base = this.identity = identity; } public interface LongBinaryOperator { //根据两个参数计算返回一个值 long applyAsLong(long left, long right); }

上面提到LongAdder 其实就是LongAccumulator 的一个特例,调用LongAdder 相当使用下面的方式调用 LongAccumulator。

LongAdder adder = new LongAdder(); LongAccumulator accumulator = new LongAccumulator(new LongBinaryOperator() { @Override public long applyAsLong(long left, long right) { return left + right; } }, 0);

LongAccumulator相比LongAdder 可以提供累加器初始非0值,后者只能默认为0,另外前者还可以指定累加规则,比如不是累加而相乘,只需要构造LongAccumulator 时候传入自定义双目运算器即可,后者则内置累加规则。

从下面代码知道LongAccumulator相比于LongAdde的不同在于casBase的时候,后者传递的是b+x,而前者则是调用了r=function.applyAsLong(b=base.x)来计算。

LongAdder类的add源码如下:

public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }

LongAccumulator的accumulate方法的源码如下:

public void accumulate(long x) { Cell[] as; long b, v, r; int m; Cell a; if ((as = cells) != null || (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = (r = function.applyAsLong(v = a.value, x)) == v || a.cas(v, r))) longAccumulate(x, function, uncontended); } }

另外LongAccumulator调用longAccumulate时候传递的是function,而LongAdder是null,从下面代码可以知道当fn为null,时候就是使用v+x 加法运算,这时候就等价于LongAdder,fn不为null的时候则使用传递的fn函数计算,如果fn为加法则等价于LongAdder;

代码语言:javascript
复制
else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
       // Fall back on using base
      break;
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • LongAccumulator类源码分析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档