说 Condition 前,需要说下 ConditioObject。ConditionObject 是同步器 AbstractQueuedSynchronzied 的内部类,因为 Condition 的操作需要关联的锁。ArrayBlockingQueue 就是 Condition 的具体应用。Object 中其实 也有 wait ,notify ,notifyAll 等操作, Condition 相当于将 wait ,notify ,notifyAll 转换成想要的对象,将比较难懂的同步操作变成直观可控的对象行为。
ArrayBlockingQueue 的构造函数。
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
通过构造函数,可以看到 Condition 的创建时需要关联锁的。
从队列中去取出(take)数据 。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
往队列中加入数据 enqueue
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
可以看主要用了 await signal 等方法。具体代表什么含义?
Condition 实现主要包含三个部分:等待队列、等待、通知。
如果了解 AQS 原理可以知道, AQS 中有个同步队列的概念。
等待队列和同步队列类似,都是一个 FIFO 队列。队列上每个节点包含一个线程引用,该线程就是 Condition 对象上的等待线程。等待队列结构如下:
Condition 等待队列,也是包含首节点(firstWaiter),和尾节点(tailWaiter),如果一个线程调用了 Condition.await() 方法。那么该线程将会释放锁,并以当前线程构造节点加入等待队列并进入等待状态。
Object 监视器模型 包含了一个同步多路和多个等待队列,结构如下所示:
同步队列和等待队列
当调用 Condition 的 await() 方法(或者以 await开头的方法),会使得当前线程进入等待队列,并且释放锁,同时线程的状态变为等待状态。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程加入等待队列
Node node = addConditionWaiter();
// 释放同步状态,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// node 不在节点中会一直 park 阻塞下去。达到等待的效果。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
调用该方法的线程成功获得了锁的线程,也就是同步队列的首节点,该方法将会将该线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后续节点,然后当前节点会进入等待状态。要注意的是,如果等地队列中的节点被唤醒,唤醒节点的线程开始尝试获取同步状态。但是如果不是通过 Condition.signal 进行唤醒的,而是对等待线程进行中断,那么会抛出 InterruptedException。
调用 Condition signal 方法后,当前线程会加入到等待队列,如下图所示:
调用 Condition.signal() 方法,将会唤醒等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移动到同步队列中。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
需要注意的是,调用该方法的前置条件是当前线程必须获得了锁,可以看到 Signal() 方法进行了 isHeldExclusively 检查,判断是否获得了锁,接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程。
节点从等待队列,移动到同步队列的操作过程如下:
image
通过调用同步器的 enq(Node node) 方法,等待队列中的头节点线程安全地移动到同步队列中,当节点移动到同步队列后,当前线程再使用 LockSupport 唤醒该节点的线程。
被唤醒的线程,将从 await() 方法中的 while 循环中退出。从 await 方法看
// 当前节点已经在同步队列了,不会在循环下去了
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
随后调用同步器的 acquireQueued() 方法加入到同步队列的竞争中。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
成功获取同步状态(获得锁)之后,被唤醒的线程,景从先前调用的 await 方法返回。此时线程已经成功获得了锁。
本文剖析了一下 Condition 的实现原理,等待队列,等待,通知的实现原理。