在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有独占锁.
我们在系列java并发线程实战(1)线程安全和机制原理,已经提到例子,
private volatile int count = 0;
public void doAdd(CountDownLatch countDownLatch) {
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000; j++) {
count++;
}
}
}).start();
}
}
count的输出值结果肯定小于4000。这是因为程序是线程不安全的,所以造成的结果count值可能小于4000;具体分析已经在java并发线程实战(1)线程安全和机制原理 提到过。
其实我们可以使用上Synchronized
同步锁,我们只需要在count++的位置添加同步锁:
synchronized (TestAdd.class){
count++;
}
加了同步锁之后,count自增的操作变成了原子性操作,所以最终的输出一定是count=4000,代码实现了线程安全。
但是Synchronized
虽然确保了线程的安全,但是在性能上却不是最优的,Synchronized
关键字会让没有得到锁资源的线程进入BLOCKED
状态,而后在争夺到锁资源后恢复为RUNNABLE
状态,这个过程中涉及到操作系统用户模式和内核模式的转换,代价比较高。
尽管Java1.6为Synchronized
做了优化,增加了从偏向锁到轻量级锁再到重量级锁的过度,但是在最终转变为重量级锁之后,性能仍然较低。
synchronized独占锁机制存在以下问题:
(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。
(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。
(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。
volatile是不错的机制,但是volatile不能保证原子性。因此对于同步最终还是要回到锁机制上来。
独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。
悲观锁总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。再比如 Java 里面的同步原语 synchronized 关键字的实现也是悲观锁。而另一个更加有效的锁就是乐观锁。
乐观锁:每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。
乐观锁:顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于 write_condition 机制,其实都是提供的乐观锁。Java里面的乐观锁用到的机制就是CAS,Compare and Swap,在java.util.concurrent.atomic 包下面的原子变量类就是使用了乐观锁的一种实现方式 CAS 实现的。
我们通过AtomicInteger原子操作类确保原子性操作,
package com.demo.springboot2.web.service;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
public class TestAdd {
private volatile int count = 0;
public static AtomicInteger atomicIntegerCount = new AtomicInteger(0);
public void doAdd(CountDownLatch countDownLatch) {
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000; j++) {
count++;
atomicIntegerCount.incrementAndGet();
}
countDownLatch.countDown();
}
}).start();
}
}
public int getCount() {
return count;
}
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(4);
TestAdd add = new TestAdd();
add.doAdd(countDownLatch);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("volatile count:"+ add.getCount());
System.out.println("atomicIntegerCount:"+ atomicIntegerCount.get());
}
}
atomicIntegerCount结果稳稳的输出4000。而count即使加了volatile也不能保证原子性:
至于为什么AtomicInteger能保证原子性,主要是使用了CAS。
CAS,compare and swap的缩写,中文翻译成比较并交换。
我们都知道,在java语言之前,并发就已经广泛存在并在服务器领域得到了大量的应用。所以硬件厂商老早就在芯片中加入了大量直至并发操作的原语,从而在硬件层面提升效率。在intel的CPU中,使用cmpxchg指令。
在Java发展初期,java语言是不能够利用硬件提供的这些便利来提升系统的性能的。而随着java不断的发展,Java本地方法(JNI)的出现,使得java程序越过JVM直接调用本地方法提供了一种便捷的方式,因而java在并发的手段上也多了起来。
在 Java 中,Java 并没有直接实现 CAS,CAS 相关的实现是通过 C++ 内联汇编的形式实现的。Java 代码需通过 JNI 才能调用。
CAS 操作包含三个操作数 —— 内存位置原数据(V)、旧的预期原值(A)和待修改的新值(B)。
当多个线程同时对某个资源进行CAS操作,只能有一个线程操作成功,但是并不会阻塞其他线程,其他线程只会收到操作失败的信号。可见 CAS 其实是一个乐观锁。
主存中保存V值,线程中要使用V值要先从主存中读取V值到线程的工作内存A中,然后计算后变成B值,最后再把B值写回到内存V值中。多个线程共用V值都是如此操作。CAS的核心是在将B值写入到V之前要比较A值和V值是否相同,如果不相同证明此时V值已经被其他线程改变,重新将V值赋给A,并重新计算得到B,如果相同,则将B值赋给V。
AVA1.5开始引入了CAS,主要代码都放在JUC(JUC就是java.util.concurrent包的简称)的atomic包下,如下图:
JUC的核心就是CAS与AQS。CAS是java.util.concurrent.atomic包的基础,如AtomicInteger、AtomicBoolean、AtomicLong等等类都是基于CAS。
CAS的实现主要在JUC中的atomic包,我们以AtomicInteger类为例,接下来我们来分析一下AtomicInteger类的源码,
看看上面例子的atomicIntegerCount.incrementAndGet();如何实现:
public class AtomicInteger extends Number implements java.io.Serializable {
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
// ......
}
从中可以看出JAVA中的CAS操作都是通过sun包下Unsafe类实现,而Unsafe类中的方法都是native方法,由JVM本地实现。
1)、在没有锁的机制下可能需要借助volatile原语,保证线程间的数据是可见的(共享的)
private volatile int value;
这样才获取变量的值的时候才能直接读取。
public final int get() {
return value;
}
2、然后来看看++i是怎么做到的。
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
unsafe.getAndAddInt这里采用了Unsafe的compareAndSwapInt的CAS操作,每次从内存中读取数据然后将此数据和+1后的结果进行CAS操作,如果成功就返回结果,否则重试直到成功为止。
Unsafe的compareAndSwapInt 是 Native 的方法:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
通过代码追溯,可以看出JAVA中的CAS操作都是通过sun包下Unsafe类实现,而Unsafe类中的方法都是native方法,由JVM本地实现,所以最终的实现是基于C、C++在操作系统之上操作。
Unsafe类,在sun.misc包下,不属于Java标准。Unsafe类提供一系列增加Java语言能力的操作,如内存管理、操作类/对象/变量、多线程同步等。
总结JAVA 的CAS实现:
关于unsafe的cas实现,具体源码没有仔细查看,以下这些是copy网上资料:
Hotspot源码中关于unsafe的实现hotspot\src\share\vm\prims\unsafe.cpp: / unsafe.cpp /* * 这个看起来好像不像一个函数,不过不用担心,不是重点。UNSAFE_ENTRY 和 UNSAFE_END 都是宏, * 在预编译期间会被替换成真正的代码。下面的 jboolean、jlong 和 jint 等是一些类型定义(typedef): * * jni.h * typedef unsigned char jboolean; * typedef unsigned short jchar; * typedef short jshort; * typedef float jfloat; * typedef double jdouble; * * jni_md.h * typedef int jint; * #ifdef _LP64 // 64-bit * typedef long jlong; * #else * typedef long long jlong; * #endif * typedef signed char jbyte; */ UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); oop p = JNIHandles::resolve(obj); // 根据偏移量,计算 value 的地址。这里的 offset 就是 AtomaicInteger 中的 valueOffset jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); // 调用 Atomic 中的函数 cmpxchg,该函数声明于 Atomic.hpp 中 return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END // atomic.cpp unsigned Atomic::cmpxchg(unsigned int exchange_value, volatile unsigned int* dest, unsigned int compare_value) { assert(sizeof(unsigned int) == sizeof(jint), "more work to do"); /* * 根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载 * 函数。相关的预编译逻辑如下: * * atomic.inline.hpp: * #include "runtime/atomic.hpp" * * // Linux * #ifdef TARGET_OS_ARCH_linux_x86 * # include "atomic_linux_x86.inline.hpp" * #endif * * // 省略部分代码 * * // Windows * #ifdef TARGET_OS_ARCH_windows_x86 * # include "atomic_windows_x86.inline.hpp" * #endif * * // BSD * #ifdef TARGET_OS_ARCH_bsd_x86 * # include "atomic_bsd_x86.inline.hpp" * #endif * * 接下来分析 atomic_windows_x86.inline.hpp 中的 cmpxchg 函数实现 */ return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest, (jint)compare_value); } 上面的分析看起来比较多,不过主流程并不复杂。如果不纠结于代码细节,还是比较容易看懂的。接下来,我会分析 Windows 平台下的 Atomic::cmpxchg 函数。继续往下看吧。 // atomic_windows_x86.inline.hpp #define LOCK_IF_MP(mp) __asm cmp mp, 0 \ __asm je L0 \ __asm _emit 0xF0 \ __asm L0: inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchange int mp = os::is_MP(); __asm { mov edx, dest mov ecx, exchange_value mov eax, compare_value LOCK_IF_MP(mp) cmpxchg dword ptr edx, ecx } } 上面的代码由 LOCK_IF_MP 预编译标识符和 cmpxchg 函数组成。为了看到更清楚一些,我们将 cmpxchg 函数中的 LOCK_IF_MP 替换为实际内容。如下: inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // 判断是否是多核 CPU int mp = os::is_MP(); __asm { // 将参数值放入寄存器中 mov edx, dest // 注意: dest 是指针类型,这里是把内存地址存入 edx 寄存器中 mov ecx, exchange_value mov eax, compare_value // LOCK_IF_MP cmp mp, 0 /* * 如果 mp = 0,表明是线程运行在单核 CPU 环境下。此时 je 会跳转到 L0 标记处, * 也就是越过 _emit 0xF0 指令,直接执行 cmpxchg 指令。也就是不在下面的 cmpxchg 指令 * 前加 lock 前缀。 */ je L0 /* * 0xF0 是 lock 前缀的机器码,这里没有使用 lock,而是直接使用了机器码的形式。至于这样做的 * 原因可以参考知乎的一个回答: * https://www.zhihu.com/question/50878124/answer/123099923 */ _emit 0xF0 L0: /* * 比较并交换。简单解释一下下面这条指令,熟悉汇编的朋友可以略过下面的解释: * cmpxchg: 即“比较并交换”指令 * dword: 全称是 double word,在 x86/x64 体系中,一个 * word = 2 byte,dword = 4 byte = 32 bit * ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元 * edx: ... 表示一个内存单元,edx 是寄存器,dest 指针值存放在 edx 中。 * 那么 edx 表示内存地址为 dest 的内存单元 * * 这一条指令的意思就是,将 eax 寄存器中的值(compare_value)与 edx 双字内存单元中的值 * 进行对比,如果相同,则将 ecx 寄存器中的值(exchange_value)存入 edx 内存单元中。 */ cmpxchg dword ptr edx, ecx } } 到这里 CAS 的实现过程就讲完了,CAS 的实现离不开处理器的支持。以上这么多代码,其实核心代码就是一条带lock 前缀的 cmpxchg 指令,即
lock cmpxchg dword ptr [edx], ecx
。
CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。ABA问题,循环时间长开销大和只能保证一个共享变量的原子操作
CAS 由三个步骤组成,分别是“读取->比较->写回”。 考虑这样一种情况,线程1和线程2同时执行 CAS 逻辑,两个线程的执行顺序如下:
时刻1:线程1执行读取操作,获取原值 A,然后线程被切换走undefined 时刻2:线程2执行完成 CAS 操作将原值由 A 修改为 Bundefined 时刻3:线程2再次执行 CAS 操作,并将原值由 B 修改为 Aundefined 时刻4:线程1恢复运行,将比较值(compareValue)与原值(oldValue)进行比较,发现两个值相等。undefined 然后用新值(newValue)写入内存中,完成 CAS 操作
如上流程,线程1并不知道原值已经被修改过了,在它看来并没什么变化,所以它会继续往下执行流程。对于 ABA 问题,通常的处理措施是对每一次 CAS 操作设置版本号。
从Java1.5 开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
ABA问题的解决办法
1.在变量前面追加版本号:每次变量更新就把版本号加1,则A-B-A就变成1A-2B-3A。 2.atomic包下的AtomicStampedReference类:其compareAndSet方法首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用的该标志的值设置为给定的更新值。
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。
比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
Java的concurrent包下就有很多类似的实现类,如Atomic开头那些。
public class SpinLock {
private AtomicReference<Thread> sign =new AtomicReference<>();
public void lock(){
Thread current = Thread.currentThread();
while(!sign .compareAndSet(null, current)){
}
}
public void unlock (){
Thread current = Thread.currentThread();
sign .compareAndSet(current, null);
}
}
所谓自旋锁,我觉得这个名字相当的形象,在lock()的时候,一直while()循环,直到 cas 操作成功为止。
AtomicInteger 的 incrementAndGet()与自旋锁有异曲同工之妙,就是一直while,直到操作成功为止。
所谓令牌桶限流器,就是系统以恒定的速度向桶内增加令牌。每次请求前从令牌桶里面获取令牌。如果获取到令牌就才可以进行访问。当令牌桶内没有令牌的时候,拒绝提供服务。我们来看看 eureka
的限流器是如何使用 CAS 来维护多线程环境下对 token 的增加和分发的。
public class RateLimiter {
private final long rateToMsConversion;
private final AtomicInteger consumedTokens = new AtomicInteger();
private final AtomicLong lastRefillTime = new AtomicLong(0);
@Deprecated
public RateLimiter() {
this(TimeUnit.SECONDS);
}
public RateLimiter(TimeUnit averageRateUnit) {
switch (averageRateUnit) {
case SECONDS:
rateToMsConversion = 1000;
break;
case MINUTES:
rateToMsConversion = 60 * 1000;
break;
default:
throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
}
}
//提供给外界获取 token 的方法
public boolean acquire(int burstSize, long averageRate) {
return acquire(burstSize, averageRate, System.currentTimeMillis());
}
public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {
if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
return true;
}
//添加token
refillToken(burstSize, averageRate, currentTimeMillis);
//消费token
return consumeToken(burstSize);
}
private void refillToken(int burstSize, long averageRate, long currentTimeMillis) {
long refillTime = lastRefillTime.get();
long timeDelta = currentTimeMillis - refillTime;
//根据频率计算需要增加多少 token
long newTokens = timeDelta * averageRate / rateToMsConversion;
if (newTokens > 0) {
long newRefillTime = refillTime == 0
? currentTimeMillis
: refillTime + newTokens * rateToMsConversion / averageRate;
// CAS 保证有且仅有一个线程进入填充
if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
while (true) {
int currentLevel = consumedTokens.get();
int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
// while true 直到更新成功为止
if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
return;
}
}
}
}
}
private boolean consumeToken(int burstSize) {
while (true) {
int currentLevel = consumedTokens.get();
if (currentLevel >= burstSize) {
return false;
}
// while true 直到没有token 或者 获取到为止
if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
return true;
}
}
}
public void reset() {
consumedTokens.set(0);
lastRefillTime.set(0);
}
}
所以梳理一下 CAS 在令牌桶限流器的作用。就是保证在多线程情况下,不阻塞线程的填充token 和消费token。
本文是认真看过cas相关源码,然后把查阅网上相关文章进行总结。