首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

高并发系列——CAS操作及CPU底层操作解析

CAS(Compare-and-Swap),即比较并替换,是一种实现并发算法时常用到的技术,Java并发包中的很多类都使用了CAS技术。CAS也是现在面试经常问的问题,本文将深入的介绍CAS的原理。

一、简单实例

一个方法实现i++时,如下面的getAndIncrement1()方法,多线程调用时,会出现并发安全问题。这时,可以通过synchronized关键字显式加锁保证并发安全。

在JAVA并发包中,提供了很多的Atomic原子类,也可以保证线程安全,比如getAndIncrement2()中的AtomicInteger.getAndIncrement()方法。


package com.wuxiaolong.concurrent;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Description:
 *
 * @author 诸葛小猿
 * @date 2020-09-14
 */
public class Test1 {

    public static int i = 0;

    /**
     * 加锁保证多线程调用的并发安全
     * @return
     */
    public synchronized static int getAndIncrement1(){
        i++;
        return i;
    }

    /**
     * AtomicInteger是线程安全的  
     * @return
     */
    public static int getAndIncrement2(){
        AtomicInteger ai = new AtomicInteger();
        int in = ai.getAndIncrement();
        return in;
    }
}

二、什么是CAS

CAS:compare and swap或compare and exchange,比较和交互。作用是保证在没有锁的情况下,多个线程对一个值的更新是线程安全的。

CAS过程:它包含三个参数CAS(V,E,N),V表示要更新的变量,E表示预期值,N表示新值,仅当V值等于E值时,才会将V值设置为N值,如果V值和E值不同,说明已经有其他线程做了更新,则当前线程什么都不做。

如果使用CAS实现i++这个操作,可以有如下几步:

1.读取当前i的值(比如i=0),记为E 2.计算i++的值为1,记为V 3.再次读取i的值,记为N。 4.如果E==N,则将i的值更新为1,否则不更新。

CAS的过程很像乐观锁,乐观锁认为发生线程安全问题的概率比较小,所以不用直接加锁,只是更新数据时比较一下原来的数据有没有发生变化。

上面第四步中,如果E==N,并不能说明i的值没有改变过,可能一个线程执行第四步的时候,另一个线程将i改变后又变回来了,对于第一个线程来说,并不知道这个中间过程的存在。这个现象就是ABA问题。

ABA问题如何解决?其实也很简单,给i加一个版本号字段,每次i有变化都对版本号加1,每次更新i的时候除了比较E值,还比较版本号是否一致。这样就解决了ABA问题。在实际开发过程中,如果ABA问题对业务没有影响,就不用考虑这个问题。

三、CAS在AtomicInteger中的使用

CAS的底层实现、synchronized的底层实现、volatile的底层实现都是一样的。我们以上面提到的AtomicInteger类说明。

AtomicInteger是线程安全的,通常说AtomicInteger是无锁或自旋锁。这就是CAS在JDK中的应用。

AtomicInteger.getAndIncrement()方法的源码:


    /**
     * Atomically increments by one the current value.
     *
     * @return the previous value
     */
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

Unsafe.getAndAddInt()源码:


    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

Unsafe.compareAndSwapInt()源码:

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

这个方法是native修饰的,在JDK中就看不到源码实现了。由于java代码是在JVM中执行的,Oracle的JVM是Hotspot, 如果要看native方法的实现,可以找Hotspot的源码,这个源码是C和C++写的。Unsafe.java这个类的源码对应Hotspot源码中unsafe.cpp,这个是C++写的。

四、CAS的底层实现

CAS的底层实现,就必须了解Hotspot的源码。可以在查看OpenJdk的代码,这里就可以找到各种版本的源码。我们以jdk8u为中的unsafe.cpp为例,继续分析compareAndSwapInt方法。


// 地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/a0eb08e2db5a/src/share/vm/prims/unsafe.cpp

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

可以看到调用到了Atomic::cmpxchg方法,继续分析找到这个方法:


// 地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/a0eb08e2db5a/src/share/vm/runtime/atomic.cpp

jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
  assert(sizeof(jbyte) == 1, "assumption.");
  uintptr_t dest_addr = (uintptr_t)dest;
  uintptr_t offset = dest_addr % sizeof(jint);
  volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
  jint cur = *dest_int;
  jbyte* cur_as_bytes = (jbyte*)(&cur);
  jint new_val = cur;
  jbyte* new_val_as_bytes = (jbyte*)(&new_val);
  new_val_as_bytes[offset] = exchange_value;
  while (cur_as_bytes[offset] == compare_value) {
    //关键方法
    jint res = cmpxchg(new_val, dest_int, cur); 
    if (res == cur) break;
    cur = res;
    new_val = cur;
    new_val_as_bytes[offset] = exchange_value;
  }
  return cur_as_bytes[offset];
}

各种系统下各种cpu架构,都有相关的实现方法,具体的文件名称如下:


// 地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/a0eb08e2db5a/src/share/vm/runtime/atomic.inline.hpp

#ifndef SHARE_VM_RUNTIME_ATOMIC_INLINE_HPP
#define SHARE_VM_RUNTIME_ATOMIC_INLINE_HPP

#include "runtime/atomic.hpp"

// Linux
#ifdef TARGET_OS_ARCH_linux_x86
# include "atomic_linux_x86.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_sparc
# include "atomic_linux_sparc.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_zero
# include "atomic_linux_zero.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_arm
# include "atomic_linux_arm.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_linux_ppc
# include "atomic_linux_ppc.inline.hpp"
#endif

// Solaris
#ifdef TARGET_OS_ARCH_solaris_x86
# include "atomic_solaris_x86.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_solaris_sparc
# include "atomic_solaris_sparc.inline.hpp"
#endif

// Windows
#ifdef TARGET_OS_ARCH_windows_x86
# include "atomic_windows_x86.inline.hpp"
#endif

// AIX
#ifdef TARGET_OS_ARCH_aix_ppc
# include "atomic_aix_ppc.inline.hpp"
#endif

// BSD
#ifdef TARGET_OS_ARCH_bsd_x86
# include "atomic_bsd_x86.inline.hpp"
#endif
#ifdef TARGET_OS_ARCH_bsd_zero
# include "atomic_bsd_zero.inline.hpp"
#endif

#endif // SHARE_VM_RUNTIME_ATOMIC_INLINE_HPP
    

在src/oscpu/目录下面有各种系统下各种cpu架构的代码实现,其中src/oscpu/linux_x86/vm下是基于linux下x86架构的代码,cmpxchg方法的最终实现:


// 地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/a0eb08e2db5a/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}    

其中__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"一段代码是核心,asm指汇编语言,是机器语言,直接和cpu交互。

LOCKIFMP指“如果是多个CPU就加锁”,MP指Multi-Processors。程序会根据当前处理器的数量来决定是否为cmpxchg指令添加lock前缀。如果程序是在多处理器上运行,就为cmpxchg指令加上lock前缀(lock cmpxchg)。反之,如果程序是在单处理器上运行,就省略lock前缀(单个处理器自身会维护单处理器内的顺序一致性,不需要lock前缀提供的内存屏障效果)。


// 地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/a0eb08e2db5a/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp

// Adding a lock prefix to an instruction on MP machine
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

总结

从上面可以看出,CAS的本质就是:

lock cmpxchg 指令

但是cmpxchg这条cpu指令本身并不是原子性的,还是依赖了前面的lock指令。

完成,收工!

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/5b2731c61bd4e7966c898314d
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券