AQS 抽象队列同步器(AbstractQueuedSynchronizer) 作为Java并发库的基石,像ReentrantLock,ThreadPoolExecutor,Semaphore等类都使用到AQS完成线程间同步,本文主要来和大家分享一下我自己对AQS实现和设计理念的一些新的思考。
AQS(抽象队列同步器)核心由两部分组成:
条件变量很简单,所有场景下的条件变量都可以泛化为一个整数值,因此AQS使用一个整数来表示条件变量:
private volatile int state;
考虑到等待线程数量未知,并且经常发生出入队动作,因此AQS队列采用链表结构实现,那么下面我们首先来看看AQS针对的哪些场景下的问题。
AQS需要解决哪些场景下的问题 , 泛化而言有五个方面:
- 互斥模式下 ,state == 1 表示资源被占有 ,state == 0 表示资源空闲 , state > 1 表示可重入情况
如果大家仔细观察会发现以上四种模式本质都是条件变量的应用,不同之处在于对何时满足条件这个定义不同,因此AQS也支持第五种模式条件变量,当以上四种常见均不能满足你的需求时,由你自己设计满足条件的定义是什么。
关于条件变量的实现,这里我想特别展开说明一点: 为什么条件变量需要锁的保护 ?大家可以反向思考一下,如果没有锁的保护,条件变量的使用会出现什么问题 ?
如果不使用锁来保护你正在等待的数据,就会出现虚假唤醒的问题,这个问题出现的本质是因为第1步和第4步之间存在一个时间窗口,在这个时间窗口内,如果线程2执行notify操作,那么将使得线程1错过唤醒机会,从而出现Lost Wakeup的问题。
解决Lost Wakeup问题的方法就是使用锁来保护等待的数据,也就是条件变量,如下图所示:
当然,如果这边继续深入下去的话,大家还可以思考一点: 条件变量和锁有什么联系 ? 两者表现行为都是在条件不满足时阻塞,条件满足时被唤醒,所以泛化来看,锁本质也是条件变量的一种应用,那么锁的使用上也存在虚假唤醒问题吗?
首先,如果按照问题所述,显然是存在虚假唤醒问题的,如上图所述,但是这里的错误在于问题混淆了锁的概念,严格来说只存在两种锁:
对于睡眠锁而言,其本质就是条件变量的一种应用,所以存在虚假唤醒问题,需要解决。而自旋锁这是通过不断CAS+重试来实现锁,对于自旋锁而言是不存在虚假唤醒问题的:
所以为了解决睡眠锁可能存在的虚假唤醒问题,我们需要使用自旋锁来保证睡眠锁的条件变量example的安全性,添加了自旋锁保护的睡眠锁实现如下:
相信经过了上面的讲解,大家已经理解了为什么条件变量需要锁的保护了,我们常说的锁其实属于睡眠锁,睡眠锁本质也是对条件变量的一种实现,那么当某个线程获取锁失败后,需要进入锁队列中挂起等待,如下图所示:
如果我们使用锁来保护条件变量,此时成功获取了锁,但是发现条件不满足的线程又会进入条件变量关联的队列中阻塞等待,如下图所示:
条件变量需要关联一个条件队列,睡眠锁本质也是条件变量的应用,因此上面说的锁队列本质也是条件队列,而锁保护的条件变量关联的队列我们也称之为条件队列,因此为了区分,本文后面都将称前者为锁队列。
还有一点很重要,就是条件变量只能在互斥锁保护下使用,共享锁模式下不能使用条件变量,否则还是会存在虚假唤醒问题:
上面已经铺垫了很多前置知识,对于AQS来说,其支持互斥锁,共享锁和多条件变量三大特性,因此其内部会存在两种类型队列,一种就是锁队列,还有一种就是条件变量队列,如果存在多个条件变量,那么就会存在多个条件队列,具体如下图所示:
下面我们从源码角度来分析一下五种模式的不同实现。
本文代码均基于JDK 11进行讲解。
当AQS运行在互斥模式下时,Node节点结构如下所示:
static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
volatile Node prev;
volatile Node next;
volatile int waitStatus;
// 省去互斥模式下使用不到的相关属性
...
}
其中waitStatus属性记录当前节点处于何种状态下,互斥模式下节点只可能存在三种状态:
互斥模式下释放锁时会通过头结点的状态值判断当前锁队列是否还存在阻塞的线程,如果头结点状态值为0,表明当前头结点没有后继节点需要唤醒了,如果头结点值为1,表明头结点存在后继节点需要唤醒,因此我们需要对头结点的状态值变更尤其关注。
互斥模式的实现,这里以ReentrantLock为例进行说明。
public final void acquire(int arg) {
// tryAcquire: 不同模式下对获取锁成功的定义是不同的,对于互斥锁实现而言,state作为一个二元值,0表示锁空闲,1表示锁占用
if (!tryAcquire(arg) &&
// 如果获取锁失败,则进入锁队列进行排队等候,返回值代表等待过程中是否被打断过
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
不同模式下对获取锁成功的定义是不同的,因此AQS把tryAcquire方法的实现交由子类决定,对于ReentrantLock实现而言,state作为一个二元值,0表示锁空闲 , 1表示锁占用 , 大于1表示发生可重入:
下面以ReentrantLock公平锁为例进行讲解,公平体现在线程获取锁时,是否先看一眼锁队列是否已经有人在排队中,如果是,则自己排到锁队列末尾等候。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平体现在每次抢锁前,先看一眼是否有人在等着了
if (!hasQueuedPredecessors() &&
// 如果锁队列为空,那么尝试去获取锁
compareAndSetState(0, acquires)) {
// 获取锁成功,设置当前锁的占有者为当前线程自己
setExclusiveOwnerThread(current);
return true;
}
}
// 判断是否是锁重入
else if (current == getExclusiveOwnerThread()) {
// 累加锁重入计数
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
由于ReentrantLock支持锁重入,所以实际的state并不只有0和1两种状态。
如果获取锁失败,当前线程进入锁队列排队等候,首先第一步新为当前线程新创建一个Node并添加到锁队列末尾:
private Node addWaiter(Node mode) {
Node node = new Node(mode);
// 为当前线程创建一个Node,此处Node为独占模式
for (;;) {
// 锁队列是全局共享资源,因此这里采用CAS+重试确保尾插操作的原子性
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
// 初始化锁队列
initializeSyncQueue();
}
}
}
第二步是把当前线程给挂起:,acquireQueued函数不仅包含把当前线程挂起的逻辑,还包含被唤醒后尝试抢锁的逻辑,如果失败继续挂起
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
// 获取当前线程前置节点
final Node p = node.predecessor();
// 前置节点是头结点才有资格去抢锁
if (p == head && tryAcquire(arg)) {
// 抢锁成功,设置当前节点为头结点
// 这里的头结点可以看做是Dummy Node
setHead(node);
p.next = null; // help GC
// 阻塞在锁队列等待锁的过程中是否被打断过
return interrupted;
}
// 根据前驱节点状态,判断是否需要阻塞等待,还是再尝试获取一次锁
if (shouldParkAfterFailedAcquire(p, node))
// 挂起当前线程
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
// 挂起线程和唤醒后抢锁的逻辑过程中如果发生异常,则将当前Node状态设置为CANCEL
cancelAcquire(node);
// 如果以上过程中线程被打断过,那么这里补充一次自我打断
if (interrupted)
selfInterrupt();
throw t;
}
}
shouldParkAfterFailedAcquire函数根据节点的不同状态,判断当前线程自身是应该立即挂起,还是再尝试获取一次锁:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 当前线程在将头结点或者尾结点状态由0设置为1后,再次尝试了抢锁,但是还是抢失败了,因此当前线程应该立即阻塞
if (ws == Node.SIGNAL)
return true; // 当前线程应该阻塞
// 前置节点处于取消状态,此处从前置节点开始往前扫描,将所有处于取消状态的节点都移出锁队列
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 此处waitStatues的值可能为0或者PROPAGATE(共享模式下节点才会存在该状态,此处不关心)
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
AQS队列采用双向链表实现,其中prev指针就用在对Cancel节点的回收中,其他场景不会用到。
parkAndCheckInterrupt方法将当前线程挂起,被唤醒后返回当前线程是否被打断过:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
interrupted方法会清空打断标记,因此如果必要可以在被唤醒并成功抢到锁后补充一次自我打断的逻辑。
release方法用于释放锁:
public final boolean release(int arg) {
// 由子类实现决定本次锁释放是否成功
if (tryRelease(arg)) {
Node h = head;
// 锁队列刚初始化的时候,头结点的waitStatus=0
// 但是当第一个Node入队后,头结点的waitStatus应该等于SIGNAL,表明当前头结点有后继节点需要唤醒
if (h != null && h.waitStatus != 0)
// 说明锁队列不为空,此时唤醒后继结点
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease方法尝试去释放锁,不同模式下对锁被成功释放的定义不同,因此这个方法需要子类继承实现,此处给出ReentrantLock的实现:
protected final boolean tryRelease(int releases) {
// 由于存在可重入清空,因此只有state递减为0时,才算锁被释放成功
int c = getState() - releases;
// 非法情况,只有持有锁的线程才能释放当前锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// state递减为0的时候,才算锁被释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor方法唤醒某个节点的后继结点,这里是唤醒头结点的后继结点:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 将头结点状态值设置为0
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
// 获取后继节点
Node s = node.next;
// 如果当前节点没有后继节点或者后继节点处于取消状态,则向后寻找到第一个没有取消的节点
// 然后唤醒该节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 唤醒节点
if (s != null)
LockSupport.unpark(s.thread);
}
如果被唤醒的线程抢锁成功,则会在acquireQueued方法的for循环中将自己设置为新的头结点,此时新的头结点的状态值可能为SIGNAL或者0, 如果是SIGNAL说明当前节点还存在后继节点需要唤醒,否则说明当前节点是尾结点。
如果被唤醒的线程抢锁失败,则会在shouldParkAfterFailedAcquire方法中再次将头结点的状态设置为SIGNAL,因为当前头结点抢锁失败了。
AQS互斥模式的实现到此就讲解完毕了,整体而言还是很简单的,这里简单总结一下互斥模式下抢锁和释放锁的步骤 ,首先是抢锁步骤:
其次是释放锁步骤:
上面总结了一下AQS互斥模式的运行流程,下面我们来看看AQS互斥模式的实现过程中有哪些值得我们学习的并发设计技巧:
当AQS运行在共享模式下时,此时Node节点的结构如下所示:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int PROPAGATE = -3;
volatile Node prev;
volatile Node next;
volatile int waitStatus;
// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用
// 所以这里改名方便下面理解
Node shared;
final boolean isShared() {
return shared == SHARED;
}
// 省去共享模式下使用不到的相关属性
...
}
相比于互斥模式,这里节点增加了PROPAGATE状态,并且还增加了shared属性来表明当前阻塞节点是处在共享模式下,还是互斥模式下。
其实共享模式下用不到shared属性,该属性是应用在混合模式下,但是这里为了下面讲解源码方便理解,就提前拿出来说了。
PROPAGATE状态主要是为了解决并发情况下的唤醒丢失问题而提出的,下面会专门有一小节来讲解该状态的作用,这里暂不做解释。
共享模式的实现这里以Semaphore为例进行说明。
Semaphore获取资源的方法是acquireSharedInterruptibly:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果当前线程被打断了,则抛出打断异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试去获取共享资源 -- 如何才算获取共享资源成功,由子类实现决定
if (tryAcquireShared(arg) < 0)
// 资源数量不足,则进入阻塞阶段
doAcquireSharedInterruptibly(arg);
}
由子类实现决定剩余资源够不够分配给当前线程:
protected int tryAcquireShared(int acquires) {
for (;;) {
// Semaphore获取资源时也有公平和非公平两种模式,这里展示的是公平模式的实现
// 所谓公平就是抢夺资源前先看看有没有人在排队
if (hasQueuedPredecessors())
return -1;
// remaining小于0则表示资源不足,对比传统信号量定义中可能表示等待资源的线程数量
// 如果remaining>=0 说明资源还够,那么当前线程就可以获取到资源,无需阻塞等待了
// note: remaining是减去当前线程所需资源后的剩余资源数量
int available = getState();
int remaining = available - acquires;
// 如果剩余资源不足,则直接返回对应负值,表示资源不足需要等待
if (remaining < 0 ||
// 扣减完后,剩余资源数量大于等于0,则使用cas配合重试确保原子性扣减资源数量成功
compareAndSetState(available, remaining))
return remaining;
}
}
如果剩余资源不足,当前线程需要进入资源队列进行等待,这段逻辑存在于doAcquireSharedInterruptibly方法中,当然该方法还包含线程被唤醒后再次尝试抢锁的逻辑:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 为当前线程创建Node节点尾插到资源队列尾部,节点模式是共享模式
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
// 判断自己是否是头结点的后继节点
final Node p = node.predecessor();
if (p == head) {
// 如果是头结点的后继节点,则尝试再次去获取资源
int r = tryAcquireShared(arg);
// 与互斥模式不同的一点在于,如果当前线程获取资源成功,那么它会唤醒其后继节点继续尝试获取资源
// 后继节点获取成功后,再唤醒它的后继节点,直到资源不足,则停止唤醒
if (r >= 0) {
// 如果剩余资源数量为0,也会进入到该方法,但是不会执行链式唤醒过程
// 只是把当前节点更新为新的头节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 根据前驱节点状态判断是否应该阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 如果当前线程是被中断唤醒的,那么直接抛出中断异常
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
// 将当前节点状态设置为取消状态
cancelAcquire(node);
throw t;
}
}
与互斥模式不同的一点在于,如果共享模式下线程被唤醒后成功获取到资源,当前线程会接着唤醒其后继节点,让其继续获取资源,后继节点在发现还有资源的情况下,继续唤醒其后继节点,直到资源不足,停止继续往后唤醒,整个过程实际是一个链式唤醒的过程。
下面我们进入释放共享资源小节,来看看链式唤醒的整个过程是如何发生的。
releaseShared方法负责完成对资源的释放,释放完资源后唤醒等待中的线程:
public final boolean releaseShared(int arg) {
// 资源怎样算释放成功这是由子类决定的
if (tryReleaseShared(arg)) {
// 如果资源释放成功,则进入链式唤醒过程
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared方法中采用cas+重试确保资源数原子性累加成功:
protected final boolean tryReleaseShared(int releases) {
// 因为可能存在多个线程同时尝试归还资源,因此这里为了确保能够原子性累加成功
// 采用cas+重试机制实现
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared方法开启链式唤醒过程:
private void doReleaseShared() {
for (;;) {
Node h = head;
// 确保资源队列不为空
if (h != null && h != tail) {
int ws = h.waitStatus;
// 确保头结点存在后继节点
if (ws == Node.SIGNAL) {
// 设置头结点的状态值为0,如果cas设置失败,说明可能存在多个线程同时尝试开启链式唤醒
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒头结点的后继节点
unparkSuccessor(h);
}
// 存在并发唤醒的情况处理 --- 具体可以看唤醒丢失小节
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 头结点没有变动的情况下,说明没有竞争唤醒问题,则跳出循环
// 否则尝试继续唤醒变动后的头结点的下一个后继节点
if (h == head) // loop if head changed
break;
}
}
被唤醒的线程会在doAcquireSharedInterruptibly方法的for循环中继续尝试去获取资源,如果获取成功,并且还有剩余资源,则唤醒自己的后继节点:
private void setHeadAndPropagate(Node node, int propagate) {
// 更新当前成功获取资源的节点为新的头结点
Node h = head;
setHead(node);
// propagate代表剩余资源剩余数量,只有在还有剩余资源的情况下才会去唤醒自己的后继节点
// h保留的是旧的头结点,如果旧的头结点状态值为PROPAGATE,则再尝试唤醒其后继节点,去获取一下资源试试
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果当前节点是链表尾节点,或者后继节点是共享类型节点,则尝试进行唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}
如果被唤醒的线程获取资源失败,则会进入shouldParkAfterFailedAcquire将头结点状态由0或者PROPAGATE更新为SIGNAL,然后自己再尝试获取一次资源:
如果不考虑并发唤醒的问题,那么整个链式唤醒过程如下图所示:
并发唤醒会使得整个问题复杂起来,为了解决这个问题AQS引入了PROPAGATE状态,在JDK 6之前是没有这个状态的,当时的Semphore实现存在一个bug,如下图所示:
至此,t2 永远不会被唤醒,问题产生。
上述bug产生的原因就是因为共享锁的获取和释放在同一时刻很可能会有多条线程并发执行,这就导致在这个过程中可能会产生这种 waitStatus 为 0 的中间状态,而AQS会将waitStatus状态值为0看做是资源队列为空的信号,因此此时产生的尴尬问题就是明明还有资源,但是确无法唤醒等待队列中的线程,因此通过引入 PROPAGATE 状态来解决这个问题。
为了解决这个问题为头结点引入了PROPAGATE状态,此时我们再来看引入PROPAGATE状态后的流程是怎样的:
后继节点 t2 被唤醒,问题解决。
引入 PROPAGATE 状态还有一个好处:加速唤醒后继节点。
doReleaseShared
方法中有这个条件判断:
if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
如果没有 PROPAGATE
状态,当多条线程同时运行到这里后,可能就直接退出了,虽然这时有个线程正在调用 unparkSuccessor
方法去唤醒后继节点,但唤醒后的线程也需要等到获取到锁且成为头节点后才能调用 doReleaseShared
方法再去唤醒后继节点。
当并发大时,在这个过程中很有可能会有新节点入队并满足唤醒条件,所以有了 PROPAGATE
状态,当多条线程同时运行到这里后,CAS 失败后的线程可以再次去循环判断能否唤醒后继节点,如果满足唤醒条件就去唤醒。
毕竟,调用 doReleaseShared
方法越多、越早就越有可能更快的唤醒后继节点。
线程t6先入队阻塞,然后线程t4和t5释放资源,如果没有上面那句判断,那么就不会将头结点设置为PROPAGATE状态,也就不会去唤醒t2和t6了。
AQS共享模式的实现到此就讲解完毕了,整体而言,除了链式唤醒那块比较难搞之外,其他部分还是很容易理解的,下面简单总结一下共享模式下抢夺资源和释放资源的步骤,首先是抢夺资源的步骤:
其次是释放锁步骤:
链式唤醒过程的讲述不包含并发唤醒处理步骤,这一块比较复杂,大家可以参考唤醒丢失小节自行总结。
共享模式的实现中用到的并发技巧还是CAS+重试,全程并没有使用到睡眠锁。
当AQS运行在混合模式下时,此时Node节点的结构如下图所示:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int PROPAGATE = -3;
volatile Node prev;
volatile Node next;
volatile int waitStatus;
// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用
// 所以这里改名方便下面理解
Node shared;
final boolean isShared() {
return shared == SHARED;
}
// 省去混合模式下使用不到的相关属性
...
}
混合模式的实现这里以ReentrantReadWriteLock读写锁为例进行说明。
读写锁场景下,我们主要需要关心读写互斥的问题,首先读读肯定是并发的,但是读写,写写操作必须是互斥的;还有几个问题我们也需要考虑一下:
这里我先回答第2和第3个问题:
关于第一个问题的答案,我们将在接下来的代码解析环节揭晓。
我们首先来看读写锁实现中获取写锁的方法实现:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
AQS提供的acquire模版方法就不多讲了,大家可以自行回忆整个过程,其中我们来看看读写锁是如何判断写锁是否获取成功的这个过程:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// state的前半部分记录读锁数量,后半部分记录写锁数量
int c = getState();
int w = exclusiveCount(c);
// 如果存在读锁或者写锁,c!=0的条件都会满足
if (c != 0) {
// 只有存在写锁并且是当前线程持有的写锁,下面这个条件才不会满足
// 否则说明当前写锁获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果是当前线程自己的写锁重入,则判断是否重入次数过多
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 累加重入次数
setState(c + acquires);
return true;
}
// 此时不存在任何锁,那么为什么还要判断当前写锁是否应该阻塞呢?
// 因为如果多个线程同时执行到当前逻辑,那么可能已经有线程已经抢到锁了,然后某几个线程已经入队等候了
// 当前线程执行到此处时发生上下文切换,再次切换回来执行时,正好碰上锁被释放
// 判断逻辑: 当前是公平还是非公平锁,如果是公平锁,则看是否有人排队等候锁,否则该函数永远返回false
if (writerShouldBlock() ||
// cas尝试去获取锁,如果失败了则返回false
!compareAndSetState(c, c + acquires))
return false;
// 抢锁成功,设置锁的占有线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
如果当前存在锁,并且不属于自身写锁可重入场景,则表明抢锁失败,当前线程需要入队阻塞。
如果不存在锁,则CAS尝试抢锁,失败了还是入队阻塞。
关于判断写锁是否应该阻塞的公平和非公平实现如下:
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // writers can always barge
}
...
}
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
...
}
读写锁实现中释放写锁的代码如下所示:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
AQS提供的release模版方法这里不再多说,下面来看看tryRelease方法是如何完成写锁的释放流程的:
protected final boolean tryRelease(int releases) {
// 要确保持有锁的线程是自己
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 判断写锁计数递减后是否为0
boolean free = exclusiveCount(nextc) == 0;
// 不为0说明存在锁重入
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
读写锁实现中获取读锁的代码如下所示:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
AQS提供的acquireShared模版方法这里不再多说,下面来看看tryAcquireShared方法是如何完成读锁的获取流程的:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 考虑读锁升级写锁场景: 如果当前存在写锁并且不是当前线程只有的写锁,那么获取读锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读锁个数
int r = sharedCount(c);
// 如果当前读操作需要被阻塞,那么需要到fullTryAcquireShared函数中检查是否真的需要阻塞
if (!readerShouldBlock() &&
// 确保读锁数量没有达到上限
r < MAX_COUNT &&
// 尝试累加全局读锁计数
compareAndSetState(c, c + SHARED_UNIT)) {
// 首个获取读锁的线程单独记录
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 其他获取读锁的线程使用ThreadLocal记录其获取的读锁数量
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
// cachedHoldCounter记录最新一次获取读锁的线程的HoldCounter
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 这里存在三种情况会进入下面这个方法的逻辑:
// 1. readerShouldBlock方法判断当前读锁获取应该阻塞等待
// 2. 读锁获取数量达到最大值
// 3. 尝试原子性累加全局读锁计数失败
return fullTryAcquireShared(current);
}
fullTryAcquireShared方法负责处理以下几种场景:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
//===================阶段1: 前置检查 =======================
int c = getState();
// 如果发现写锁不为空,要确保持有写锁的线程始终是当前线程自己,否则获取读锁失败
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 判断当前线程是否需要入队等待
} else if (readerShouldBlock()) {
// 如果当前线程还没有持有读锁,那么情况当前线程的threadLocal本地缓存中记录的读取计数器
if (firstReader != current) {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 如果当前线程不持有任何锁,则返回-1,表示当前线程需要入队阻塞
// 如果当前线程持有读锁,那么即便readerShouldBlock返回true,这里也不会阻塞
// 为了防止死锁的发生
if (rh.count == 0)
return -1;
}
}
// 如果读锁获取的总次数达到最大限制,则抛出异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//===================阶段2: 累加计数器=======================
// 累加全局读锁计数
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 说明当前线程是第一个获取读锁的线程
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 处理获取读锁的首位线程的读锁重入
firstReaderHoldCount++;
} else {
// 普通情况处理
if (rh == null)
// 先获取最近一次获取读锁的线程的HoldCounter
rh = cachedHoldCounter;
// 如果最近一次获取读锁的线程记录不存在,或者其记录的不是当前线程
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
// 从线程自己的本地缓存获取自己的HoldCounter
rh = readHolds.get();
// 如果最近一次获取读锁的线程就是当前线程,那么累加当前线程本地读锁计数器
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
// 记录最近一次获取读锁的线程的读锁计数器
cachedHoldCounter = rh; // cache for release
}
// 返回1表示获取读锁成功
return 1;
}
}
}
fullTryAcquireShared方法大体分为两个阶段,前置检查和累加计数,其中前置检查阶段需要特别注意一点:
关于读锁是否应该阻塞这个问题,其公平版本实现还是判断等待队列中是否存在线程正在等待,这一点和之前一样,关键是其非公平版本实现,判断逻辑:
static final class NonfairSync extends Sync {
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
static final class FairSync extends Sync {
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
关于读锁的非公平版本实现主要是考虑到写锁的饥饿性问题:
大家可以回想一下本篇开始提出的问题一,相信大家已经都知晓答案了。
读写锁实现中释放读锁的代码如下所示:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
AQS提供的releaseShared模版方法这里不再多说,下面来看看tryReleaseShared方法是如何完成读锁的释放流程的:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果当前线程是第一个获取读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 如果读锁没有出现可重入获取,直接将firstReader置空
if (firstReaderHoldCount == 1)
firstReader = null;
else
// 递减读锁获取次数,等到为0时,才算读锁被真正释放
firstReaderHoldCount--;
} else {
// 通过缓存拿到最近一次获取读锁的线程的HoldCounter计数
HoldCounter rh = cachedHoldCounter;
// 如果缓存中没有,或者缓存记录的不是当前线程的读计数器,那么从当前线程本地缓存中获取
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
// 如果读锁获取计数器=1,则直接释放当前读锁
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 否则递减计数器
--rh.count;
}
// 通过cas加重试确保全局计数器递减成功
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// nextc == 0 条件满足说明读锁和写锁计数器都为0
return nextc == 0;
}
}
关于混合模式的实现,其本身结合使用了互斥模式和共享模式,因此锁队列中既存在类型为独占模式的Node,也存在类型为共享模式的Node,如下图所示:
之前在共享模式篇说过,共享锁的释放流程走的是链式唤醒流程,而在混合模式下,如果链式唤醒过程中遇到了独占节点,也会提前终止链式唤醒流程:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 唤醒的前提是后继节点必须是共享节点,而不是独占节点
if (s == null || s.isShared())
doReleaseShared();
}
}
当AQS处在栅栏模式下时,此时state表示剩余未到达栅栏处的线程数,先到达栅栏处的线程需要等待直到剩余线程都达到栅栏 , 当state=0时,说明所有线程都到达栅栏处,此时打开栅栏,即唤醒所有线程继续执行
当AQS运行在栅栏模式下时,此时Node节点的结构如下所示:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int PROPAGATE = -3;
volatile Node prev;
volatile Node next;
volatile int waitStatus;
// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用
// 所以这里改名方便下面理解
Node shared;
final boolean isShared() {
return shared == SHARED;
}
// 省去栅栏模式下使用不到的相关属性
...
}
栅栏模式这里以CountDownLatch为例进行说明,CountDownLatch的构造函数需要传入一个count值表明参与本轮任务的线程数量:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
严格来说CyclicBarrier才是栅栏模式的真正实现,CountDownLatch只是栅栏模式的简化版本,和栅栏模式稍有区别在于,CountDownLatch只有一轮循环,也就是所有线程到达栅栏处后,就结束所有线程的执行;而CyclicBarrier会在本轮循环结束后,重置计数器,开启下一轮循环。
主线程调用await方法等待所有线程完成任务到达栅栏处:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
acquireSharedInterruptibly这个模版方法就不多说了,tryAcquireShared方法实现也很简单,当state递减为0时,表明所有线程都已经到达栅栏处了,本轮结束,主线程可以继续往下执行了:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
当前线程完成自己的任务后,递减计数:
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared模版方法这里就不多说了,tryReleaseShared的实现也很简单,通过cas配合重试递减计数器的值,如果计数器的值被递减到0了,说明当前线程是最后一个到达栅栏处的,接下来就可以唤醒等待中的主线程了:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
当AQS运行在条件变量模式下时,此时Node节点的结构如下所示:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int PROPAGATE = -3;
static final int CONDITION = -2;
volatile Node prev;
volatile Node next;
volatile int waitStatus;
// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用
// 所以这里改名方便下面理解
Node shared;
final boolean isShared() {
return shared == SHARED;
}
// 如果当前Node处在条件队列中,那么该属性链接当前节点在队列中的下一个节点
Node nextWaiter;
// 省去共享模式下使用不到的相关属性
...
}
条件变量模式大家需要注意两点:
这两套队列复用一套Node结构
此处还未Node节点新增一个状态CONDITION,此状态用于描述处于条件队列中的节点。
这里以ReentrantLock中的ConditionObject实现来看看条件变量在AQS中是怎么玩的。
await方法为当前线程创建node节点,并加入对应的条件队列中等待,直到被signal唤醒或者中断唤醒才会结束等待状态:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 为当前线程创建一个Node节点,然后尾插到条件队列尾部
Node node = addConditionWaiter();
// 因为当前线程需要在条件队列中挂起等待,所以挂起前需要完全释放掉当前线程持有的锁
// 返回值是阻塞前持有的锁计数
int savedState = fullyRelease(node);
int interruptMode = 0;
// 当条件成立时,会将当前node节点从条件队列移入锁队列
while (!isOnSyncQueue(node)) {
// 挂起当前线程,等待条件成立
LockSupport.park(this);
// 被唤醒后检查是否是被中断唤醒的,如果是被中断唤醒的并且转移节点到锁队列成功,返回-1,并跳出循环等待
// 如果是被中断唤醒的,但是cas转移节点到锁队列失败了,此时返回1,并跳出循环等待
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 当前节点已经转移到锁队尾排队等候锁了,此时调用acquireQueued把当前线程重新挂起
// 等待被唤醒后抢锁,如果失败,继续挂起,直到抢锁成功为止
// acquireQueued方法返回值表示等待锁期间是否发生了中断,如果发生了则记录中断标记为REINTERRUPT
// 如果阻塞前线程发生了可重入,并重入了3次,那么此处抢锁要确保初始锁计数为3
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 条件满足,当前线程被唤醒,并且唤醒后抢锁成功
// 先清理一下队列中处于取消等待状态的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 在条件队列上等待期间,或者锁队列上等待期间发生了中断,则对中断进行分类处理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter方法负责为当前线程创建一个Node节点,然后尾插到条件队列尾部:
private Node addConditionWaiter() {
// 确保当前线程持有保护当前条件变量的互斥锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 清理一下条件队列中处于取消状态的Node节点
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 负责遍历整个条件队列链表,删除掉所有处于CANCEL状态的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 为即将入队的当前线程创建一个Node节点尾插到当前条件队列中
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
fullyRelease方法负责完成释放掉当前线程持有的所有锁:
final int fullyRelease(Node node) {
try {
// 返回先前持有的锁计数
int savedState = getState();
// 释放掉当前线程持有的锁,如果是可重入场景,也是全部释放掉
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
checkInterruptWhileWaiting方法在被唤醒后检查是否是被中断唤醒的,如果是说明条件满足被signal唤醒了,然后调用transferAfterCancelledWait将当前线程从条件队列转移到锁队列等候:
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
// 将当前节点状态值设置为0
if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
// 放入到锁队列尾部
enq(node);
// 此处返回true,表示是由于中断唤醒导致当前线程由条件队列转移到了锁队列中
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 如果上面cas失败,说明SIGNAL操作和中断同时发生,此时被中断唤醒的线程会不断轮询直到SIGNAL操作将节点成功转移到锁队列
while (!isOnSyncQueue(node))
Thread.yield();
// 此处返回false,此时虽然发生了中断,但是最终不是因为中断返回被唤醒,而是signal操作率先完成了唤醒
return false;
}
reportInterruptAfterWait方法负责判断当前是应该发生一次自我打断还是应该直接抛出中断异常:
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
当条件满足时,调用signal方法唤醒条件队列中第一个节点对应的线程:
public final void signal() {
// 确保当前线程持有保护条件变量的互斥锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 唤醒条件队列中等待的所有线程
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal方法负责遍历条件队列中每个节点,每遍历到一个节点先检查是否是被CANCEL了的节点,如果是则跳过,继续遍历下一个节点,如果不是则将当前节点从条件队列转移到锁队列中,并唤醒对应的线程,然后结束返回:
private void doSignal(Node first) {
do {
// 当前节点移出条件队列
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal方法负责将当前节点加入锁队列中,同时恢复挂起了的线程:
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 如果cas失败,说明当前Node已经被取消了
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将当前节点加入锁队列尾部--返回旧的尾节点
Node p = enq(node);
int ws = p.waitStatus;
// 将旧的尾节点的状态值设置为SIGNAL,因为其存在新的后继节点需要唤醒
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
// 恢复挂起的线程
LockSupport.unpark(node.thread);
return true;
}
条件变量的实现这块,需要将await和signal两部分连起来看才能看明白,这里我给出一副流程图来帮助大家理解等待唤醒过程,首先来看一下await的过程:
虽然await方法有很多代码,但是大部分代码都服务于唤醒后的处理操作。
下面来看一下signal的整个过程:
后续的流程就是互斥模式下阻塞等待锁的流程了,当t3线程抢到锁后,会判断在锁队列上阻塞等待期间是否发生了中断,如果是则会补充一次自我中断。
上面描述的是通过signal唤醒等待中的线程,还有一种是线程阻塞过程中被打断了,此时可能会发生如下两种情况:
当后续线程成功获取到锁被唤醒后,发现interruptMode被设置了THROW_IE标记,会抛出中断异常,表明此次返回并非条件为真返回,而是因为中断唤醒返回。
后续的流程就是互斥模式下阻塞等待锁的流程了,当t3线程抢到锁后,会补充一次自我中断。此时虽然发生了中断,但是最终不是因为中断返回被唤醒,而是signal操作率先完成了唤醒,但是等待期间发生了中断,所以补充了一次自我中断。
本文一开始分析过AQS框架适用的五个场景,这五个场景抽象来看都符合以下两个过程:
AQS框架使用模版方法模式将以上流程固定了下来,而判断的过程则交由子类来实现:
互斥模式下判断资源是否获取成功和共享模式下判断资源是否获取成功还是稍微有些区别的:
关于单纯应用共享模式和混合模式下的共享模式应用之间的区别可以参考下图:
读写锁场景下,如果只考虑获取读锁的场景,那么其实就如上图所示,读锁的获取总数有个最大限制,可以看做是对资源的最大数量限制。
Java AQS抽象队列同步器的大致设计思路也就讲述的差不多了,后半部分由于写作时间比较紧张,所以可能越到后面讲的就越草率了,如果有没看懂的地方,或者发现笔者写作错误可以在评论区指出或者私信与我讨论。
后台有小伙伴提出了一些疑问,这里补充说明一下:
关于第一个问题其实比较好理解,因为这里其实体现的是一种公平性,轮询前驱结点是为了确保自己在锁队列头部,这样自己才有资格抢锁。
关于第二个问题,因为条件可能只在signal的那个时刻是满足的,当前线程被唤醒后,再次尝试获取资源时,可能此时资源已经被其他线程拿走了,因此wait一般需要配合while循环使用。
关于并发编程基础这块,大家可以阅读我之前写的系列文章:
目前笔者也在筹划写一些关于硬件内存模型发展史到与java内存模式联系的文章,算是对本专栏知识点的一次补齐。
关于java并发这块,ConcurrentHashMap 的源码肯定是逃不掉的,笔者之前也写过一篇分析hash数据结构源码的文章,不仅仅是讲源码,更多是学习其用到的并发设计思维和技巧,感兴趣的可以阅读一下:
关于本文如果大家还有需要补充的内容可以私信与我讨论或者评论区留言。