AQS Condition await()方法
概述 目录
1.概述
2.同步队列 vs 条件队列
3.CondtionObject源码解析
4.await()源码解析
第1节 概述
Condition接口是通过的await/signal机制来实现同步的,此种设计方案用来代替监视器锁(Monitor/Synchronized)的wait/notify机制的,因此,与监视器锁的wait/notify机制对照着学习有助于我们更好的理解Conditon接口。Condition和监视器锁的对比如下。
(1)同步 :调用wait()方法的线程首先必须是已经进入了同步代码块,即已经获取了监视器锁;与之类似,调用await()方法的线程首先必须获得lock锁。
(2)等待 :调用wait()方法的线程会释放已经获得的监视器锁,进入当前监视器锁的等待队列中(忘记的同学可以翻一翻Synchronized相关视频资料);与之类似,调用await()方法的线程会释放已经获得的lock锁,进入到当前Condtion对应的条件队列中。
(3)唤醒 :调用监视器锁的notify()方法会唤醒等待在该监视器锁上的线程,这些线程将开始参与监视器锁的竞争,并在获得锁后,从wait()方法处恢复执行;与之类似,调用Condtion的signal()方法会唤醒对应的条件队列中的线程,这些线程将开始参与锁竞争,并在获得锁后,从await()方法处开始恢复执行。
第2节 同步队列 vs 条件队列
首先,在前面的课程中,我们说过独占锁的获取,所有等待锁的线程都会被包装成Node进入一个同步队列中。该同步队列如下:
同步阻塞队列(同步队列)是一个双向链表,我们使用prev、next属性来串联结点。但是在这个同步队列中,我们之前一直没有讲解nextWaiter属性,即使是在共享锁模式下,这一属性也只作为一个标记,指向了一个空结点,因此,在同步队列中,我们不会用它来串联结点。
每创建一个Condtion对象就会对应一个条件队列,每一个调用了Condtion对象的await()方法的线程都会被包装成Node放入一个条件队列中,就像这样:
可见,每一个Condition对象对应一个条件队列,每个条件队列都是独立的,互相不影响的。在上图中,如果我们对生产者消费者模型中的当前线程调用了notFull.await(), 则当前线程就会被包装成Node加到notFull队列的末尾。
第3节 CondtionObject源码解析
首先看一下CondtionObject实现的接口Condition的源码.
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
下面看一下Condition接口的实现类ConditionObject类的源码。
** 条件队列队头结点 */
private transient Node firstWaiter;
/** 条件队列队尾结点 */
private transient Node lastWaiter;
第4节 await()源码解析
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await()首先是将当前线程封装成Node扔进条件队列中,调用的addConditionWaiter()方法。
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
LockSupport.park(this);用于挂起当前线程。
checkInterruptWhileWaiting()方法用于检测唤醒线程的方式——唤醒或线程中断。
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
线程被唤醒后,通过acquireQueued()方法进入同步队列争取锁。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
unlinkCancelledWaiters()方法从条件队列中删除取消的等待的线程。
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
reportInterruptAfterWait()方法汇报线程中断的状态。
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
await()方法执行流程如下图所示。