本文,我们来介绍 java 中线程同步的另一个基础工具类 — AQS。
AQS (Abstract Queued Synchronizer) 是 JDK 提供的一套基于 FIFO 同步队列的阻塞锁和相关同步器的一个同步框架,通过 AQS 我们可以很容易地实现我们自己需要的独占锁或共享锁。 java 中,我们曾经介绍过的信号量、ReentrantLock、CountDownLatch 等工具都是通过 AQS 来实现的。
AQS 又称为队列同步器,java 是通过 AbstractQueuedSynchronizer 类来实现其思想的。 他通过一个 int 类型的成员变量 state 来控制同步状态,state = 0 时,则说明没有任何线程占用锁,当 state = 1 时,则说明有一个线程目前正在占用锁。 它支持实现共享锁与独占锁,下面我们就从源码来剖析,分析一下 AQS 的实现原理。
AbstractQueuedSynchronizer 继承自抽象类 AbstractOwnableSynchronizer,AbstractOwnableSynchronizer 这个类定义了存储独占当前锁的线程和获取的方法。 AbstractQueuedSynchronizer 类通过内部类 Node 构成的 FIFO 同步队列来完成线程获取锁的排队工作。 同时,AbstractQueuedSynchronizer 通过 ConditionObject 来构建等待队列。
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 标识线程已处于结束状态
static final int CANCELLED = 1;
// 等待被唤醒状态
static final int SIGNAL = -1;
// 条件状态
static final int CONDITION = -2;
// 在共享模式中使用表示获得的同步状态会被传播
static final int PROPAGATE = -3;
// 等待状态, 存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种
volatile int waitStatus;
// 同步队列中前驱结点
volatile Node prev;
// 同步队列中后继结点
volatile Node next;
// 请求锁的线程
volatile Thread thread;
// 等待队列中的后继结点
Node nextWaiter;
//判断是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
//获取前驱结点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//.....
}
Node 类是一个典型的双向链表元素结构,拥有前驱和后继的引用。 其中 SHARED 和 EXCLUSIVE 常量分别代表共享模式和独占模式,所谓共享模式是一个锁允许多条线程同时操作,如信号量 Semaphore 采用的就是基于 AQS 的共享模式实现的。 而独占模式则是同一个时间段只能有一个线程对共享资源进行操作,多余的请求线程需要排队等待,如 ReentranLock。
常量 | 值 | 意义 |
---|---|---|
CANCELLED | 1 | 同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,节点终极状态 |
SIGNAL | -1 | 等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行 |
CONDITION | -2 | 该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁 |
PROPAGATE | -3 | 在共享模式中,该状态标识结点的线程处于可运行状态 |
0 | 0 | 初始状态 |
AbstractQueuedSynchronizer 类作为抽象的基础框架类,通过定义模板方法的方式提供了一套实现锁的模板,其最基本的锁实现方式需要子类复写模板:
protected boolean tryAcquire(int arg); // 获取独占锁
protected boolean tryRelease(int arg); // 释放独占锁
protected int tryAcquireShared(int arg); // 获取共享锁
protected boolean tryReleaseShared(int arg); // 释放共享锁
protected boolean isHeldExclusively(); // 判断是否持有独占锁
ReentrantLock、Semaphore、CountDownLatch 等工具都是通过复写上述若干方法实现的。
AbstractQueuedSynchronizer 通过 acquire 方法获取锁。 下图是整个加锁过程的流程图。
通过整个流程,我们看到,独占模式获取锁的原则就是:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire 方法调用了我们上面已经提到的需要子类复写的获取独占锁方法 tryAcquire,框架只定义了获取失败后如何处理 — addWaiter。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
这是一个典型的双向队列节点入队操作,毋庸细说,他通过我们已经讲到过的 CAS 操作实现了尾节点的判断和更新。
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
而兜底方法 enq 有两种可能被调用。 1. 当前队列为空,尚没有任何元素存在 2. 并发环境下,原子操作 compareAndSetTail 失败
因此,enq 方法中承担了两部分工作的处理:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以看到,enq 方法中通过原子操作和循环的方式实现了并发环境下尾节点的添加,之所以这部分逻辑从 addWaiter 中抽离,是因为大部分情况下是不会出现上述两种可能的,将正常业务逻辑与特殊且不常用业务逻辑分离,是值得学习的一种代码组织方式。
如上所述,并发环境下有可能在插入列表之前尚需要等待锁,但在插入列表后,马上又可以获取到锁,因此此时再次获取锁就可以减少不必要的等待。 于是,在上述 addWaiter 方法执行完,又执行了 acquireQueued 方法。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋操作
for (;;) {
// 获取当前插入节点的前置节点
final Node p = node.predecessor();
// 如果当前节点的前置节点是 head 则重新获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果需要线程进入等待,则等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
最终,通过 shouldParkAfterFailedAcquire 方法判断是否需要让线程挂起,如果需要挂起,则调用 parkAndCheckInterrupt 方法挂起线程。 线程的挂起最终是通过 Unsafe 的 static native 方法 park 实现的。
最终,无论是线程获取锁过程中发生异常还是超时唤醒,都需要将当前 node 出队,这就是 cancelAcquire 方法做的事。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
可以看到,上面这段代码处理了三种情况: 1. node 是队尾节点 2. node 的前驱不是队首节点,node 也不是队尾节点 3. node 的前驱是队首节点
其中,对于情况 1 和情况 2,都进行了 node 的出队,但是第三种情况却没有执行出队方法,这是为什么呢? 原因是当前节点已经被标记为 CANCELLED 状态,那么,在后继节点执行 shouldParkAfterFailedAcquire 方法时,会先让其前驱节点,也就是我们当前的 node 节点出队。
由上所述,当 node 是队首节点的后继时,执行了 unparkSuccessor 方法。 当前节点是队列中唯一等待锁的节点,所以必须让出获取锁的权限,让他的后继去获取锁,这就是 unparkSuccessor 方法做的事情。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
代码通过循环的方式,找到了队列中 node 节点后面第一个状态不为 CANCELLED 的节点,执行 LockSupport.unpark 唤醒了该节点对应的线程。 LockSupport.unpark 最终调用了 UNSAFE 类的 unpark 方法。
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
那么,被唤醒的线程执行了什么呢?我们要找到该线程是什么时候被挂起的,那就是在上述的 acquireQueued 方法中,我们回看 acquireQueued 方法,被唤醒的线程在执行完 Thread.interrupted() 方法后继续循环,尝试获取锁,从而保证了锁的独占与竞争。 parkAndCheckInterrupt 方法并没有对 Thread.interrupted() 的返回值做任何处理,由此可见,acquire 方法获取锁失败的线程是不能被 interrupt 方法中断的。
上面提到,acquire 方法获取锁失败的线程是不能被 interrupt 方法中断的,AQS 还提供了另一个方法 acquireInterruptibly 加锁,从而让获取锁失败等待的线程可以被中断。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
可以看到,这里获取锁失败后调用了 doAcquireInterruptibly 方法,doAcquireInterruptibly 方法与 acquire 的结构非常像:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到,这里仍然是通过独占模式获取锁,但是在 parkAndCheckInterrupt 返回后,与此前 acquire 方法中继续循环的方式不同,取而代之的,抛出了 InterruptedException 异常,中断线程执行。
独占锁的解锁较为简单,因为加锁成功后,该线程对应的节点已经从同步队列中移除,此处如果解锁成功,只需要唤醒下一个节点去竞争锁即可。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这里仍然调用了 unparkSuccessor 方法,通过循环的方式,找到了队列中 node 节点后面第一个状态不为 CANCELLED 的节点,执行 LockSupport.unpark 唤醒线程。
从图上可以看到,共享锁的加锁主要做了下面三项工作: 1. 当线程调用acquireShared()申请获取锁资源时,如果成功,则进入临界区。 2. 当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。 3. 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。
整个流程与独占锁的加锁流程非常类似,最大的区别就是 setHeadAndPropagate 方法,这个方法做的事情就是我们上面提到的:
//两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //记录当前头节点
//设置新的头节点,即把当前获取到锁的节点设置为头节点
//注:这里是获取到锁之后的操作,不需要并发控制
setHead(node);
//这里意思有两种情况是需要执行唤醒操作
//1.propagate > 0 表示调用方指明了后继节点需要被唤醒
//2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果当前节点的后继节点是共享类型获取没有后继节点,则进行唤醒
//这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
doReleaseShared 方法进行了后续节点的唤醒。
private void doReleaseShared() {
for (;;) {
//唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
//其实就是唤醒上面新获取到共享锁的节点的后继节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//执行唤醒操作
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//如果头结点没有发生变化,表示设置完成,退出循环
//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head)
break;
}
}
我们再来看共享锁的解锁操作。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
结合上面我们已经介绍过的 doReleaseShared 方法源码,共享锁的解锁非常简单,与独占模式的解锁一样,他在解锁成功以后进行了后续节点的唤醒操作,从而保证锁的竞争。