ReentrantLock reentrantLock = new ReentrantLock(true); Condition condition = reentrantLock.newCondition();
reentrantLock.newCondition()方法返回的对象类型是ConditionObject类型,ConditionObject是AbstractQueuedSynchronizer的内部类,它对象的创建依赖于外部类的对象,在它里面可以调用外部类中的方法。
try{ reentrantLock.lock(); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); }finally { if(reentrantLock.isLocked()){ reentrantLock.unlock(); } }
还有的等待方法有: condition.await(5, TimeUnit.SECONDS); condition.awaitNanos(10000); condition.awaitUninterruptibly(); condition.awaitUntil(new Date())
condition.signal(); condition.signalAll();
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()方法:
public final void await() throws InterruptedException { // 如果线程中断了,则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 将当前waitStatus为condition状态的节点添加到condition的waiter队列中(将会在其中clean掉已经处于canceled状态的节点 Node node = addConditionWaiter(); // 获取到当前aqs的state(当前线程对锁的所有占有权),释放掉当前线程对锁的所有占有权,并调用aqs的release方法唤醒当前aqs头节点的继承节点(如果有的话) int savedState = fullyRelease(node); int interruptMode = 0; // 如果当前节点不在同步队列中 while (!isOnSyncQueue(node)) { // 阻塞当前线程,进入等待状态 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter:
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { // 如果当前线程不是锁的独占线程,则抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { // 去掉已经处于canceled状态的waiters,在清理时可能会改变lastWaiter的指向 unlinkCancelledWaiters(); // 重新将可能更新过的lastWaiter赋值给t t = lastWaiter; } // 新创建一个waitStatus状态为condition的节点 Node node = new Node(Node.CONDITION); if (t == null) // 如果t为null,则代表当前等待队列为空,firstWaiter就是当前节点 firstWaiter = node; else // t指向的是lastWaiter,所以这里将当前节点放在队列的最尾部 t.nextWaiter = node; // 将当前ConditionObject的lastWaiter指向node lastWaiter = node; return node; }
protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); }
该方法的作用主要是判断当前锁的独占线程是不是当前线程。
java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease:
/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final int fullyRelease(Node node) { try { // 这里调用的是AQS的getState方法 int savedState = getState(); // 调用的是AQS的release方法,如果成功则会释放当前线程所占有的锁并唤醒头节点的继任节点(如果存在的话),当然在 release方法内部又会调用tryRelease方法,该方法会根据公平锁和非公平锁有不同的实现(这个在上一篇中已经分析过。 if (release(savedState)) return savedState; // 如果release失败,则会抛出异常,然后在catch块中将节点waitStatus修改为canceled状态 throw new IllegalMonitorStateException(); } catch (Throwable t) { // 出现了异常之后将节点的waitStatus置为canceled状态 node.waitStatus = Node.CANCELLED; throw t; } }
java.util.concurrent.locks.AbstractQueuedSynchronizer#isOnSyncQueue:
// 当一个节点以初始状态被放入了一个condition队列中,现在正处于AQS同步队列中等待重新获取锁时返回true final boolean isOnSyncQueue(Node node) { // condition节点的初始waitStatus为CONDITION // 当节点等待状态为condition或者节点的前置节点为null时返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果节点拥有后续节点,则表明该节点已经在AQS的同步队列中了 if (node.next != null) // If has successor, it must be on queue return true; //节点的前置节点不为空但是可能还没有在队列中的原因是通过cas将它添加到队列中的操作可能会失败。所以我们必须反转遍历(从尾节点开始遍历)来确保它实际上完成了入队。在此方法的调用中,它将始终靠近尾部,并且除非CAS失败(这不太可能),否则它将是在离尾部很近的地方,所以我们几乎没有遍历 return findNodeFromTail(node); }
// 当从后面的tail节点往前搜索时节点在同步队列中时返回true;会在isOnSyncQueue方法有需要的时候进行调用 private boolean findNodeFromTail(Node node) { // We check for node first, since it's likely to be at or near tail. // tail is known to be non-null, so we could re-order to "save" // one null check, but we leave it this way to help the VM. for (Node p = tail;;) { // 先校验当前节点,因为它可能就在tail节点附近。找到了就表明在已经在同步队列中了 if (p == node) return true; // tail 是不能为空的,所以当遍历到空时代表需要重新排序来保存 if (p == null) return false; // 从后向前查找 p = p.prev; } }
这个isOnSyncQueue主要用来判断当前节点是否在同步队列中,新添加的condition节点是不会在同步队列中的,它只位于condition队列中。
这里有必要区分下condition的等待队列和AQS的同步队列,因为可以不止一次的调用lock.newCondition方法,这说明AQS中不止维护了一个等待队列。object监视器上只能拥有一个同步队列和一个等待队列,而AQS却拥有一个同步队列,多个等待队列。具体如下图:
注:图片来源于网络。图片中上面是AQS的同步队列,下面是一个或多个condition队列。
condition wait状态下的中断
/* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */
/** Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1;
两种中断模式:
// 移动等待时间最长的线程,从conditon的等待队列到AQS的同步队列 public final void signal() { // 如果当前线程没有拥有独占权则抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 用本地变量保存firstWaiter Node first = firstWaiter; if (first != null) // 执行signal操作 doSignal(first); }
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ // 删除并转移condition队列中的节点到AQS队列中,直到击中不可取消的节点或空值。从signal中分离出来去让编译器在没有waiters的时候进行内联优化 private void doSignal(Node first) { do { // 将firstWaiter指向first节点的nextWaiter if ( (firstWaiter = first.nextWaiter) == null) // firstWaiter为null时将lastWaiter也指向null lastWaiter = null; // 将first节点的nextWaiter节点置为null first.nextWaiter = null; // 循环着尝试将节点从condition队列移到AQS的同步队列中 } while (!transferForSignal(first) && (first = firstWaiter) != null);// 这里需要注意的是,如果头节点迁移失败,存在firstWaiter时会尝试唤醒这个firstWaiter,循环会继续 }
// 将一个节点从condition队列转移到AQS同步队列中去。成功时返回true final boolean transferForSignal(Node node) { // 如果改变节点状态失败则代表节点状态已经被改变了,会继续doSignal中的循环,如果节点存在下一个waiter,则尝试对节点的下一个waiter进行transferForSignal if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false;
// 拼接到队列上并尝试设置前任节点的waitStatus以指示线程(可能)正在等待。如果取消或者尝试设置 // waitStatus失败,唤醒线程去重新同步(在这种情况下,waitStatus可能是出现了短暂而无害的错误) Node p = enq(node); // 将node节点入队并返回node节点的前置节点即之前AQS的前置节点 int ws = p.waitStatus; // ws>0代表p节点处于canceled状态,即已经被取消 // 尝试设置p节点为SIGNAL状态,设置失败时也会直接唤醒当前线程 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) // 唤醒当前线程 LockSupport.unpark(node.thread); return true; }
doSignal主要做了以下几件事:1、将头节点从等待队列移除(如果头节点迁移失败,存在firstWaiter时会尝试唤醒这个firstWaiter,循环会继续);2、尝试将头节点状态由CONDITION改为0,即初始状态,如果失败并且节点存在下一个waiter,则尝试对节点的下一个waiter进行transferForSignal;3、将节点从同步队列尾部插入;4、在迁移的过程中如果遇到前置节点处于canceled状态或者waitStatus执行CAS将前置节点的状态改为Node.SIGNAL时失败的直接唤醒当前节点线程。这个方法的基础作用是唤醒当前condition的condition队列中等待时间最久的那个线程。
那么,在大多数情况下,竞争比较多时,condition的await方法的线程park状态是在哪里被唤醒的呢? 一个是前置节点出现上面第4种情况时,在transferForSignal方法中被唤醒,一个是在AQS的release方法中唤醒。如await方法中调用的fullyRelease方法。这两种都是很少量情况下的唤醒,注意一下,在await方法中执行 LockSupport.park(this)的是一个while循环,当while循环条件改变时,也就是说将节点从condition的等待队列迁移到AQS的sync队列之后,while条件不再生效,就会继续向下执行。
这时我们再回过头来看之前的await方法的代码:
public final void await() throws InterruptedException { // 如果线程中断了,则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 将当前waitStatus为condition状态的节点添加到condition的waiter队列中(将会在其中clean掉已经处于canceled状态的节点 Node node = addConditionWaiter(); // 获取到当前aqs的state(当前线程对锁的所有占有权),释放掉当前线程对锁的所有占有权,并调用aqs的release方法唤醒当前aqs头节点的继承节点(如果有的话) int savedState = fullyRelease(node); int interruptMode = 0; // 如果当前节点不在同步队列中,则会直接让当前线程进入等待状态,等待signal方法或者AQS的release方法唤醒(节点释放锁时会唤醒后继节点线程),while循环的作用是防止虚假唤醒,这一点在之前有写过专门的文章介绍过。 while (!isOnSyncQueue(node)) { // 阻塞当前线程,进入等待状态。注意当前节点是在condition队列中的,在调用signal方法后会将condition队列中的节点迁移至AQS队列中,然后在队列中竞争在AQS的锁所有权。 LockSupport.park(this);//注意这里Park的blocker是当前aqs对象 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 如果节点不在AQS同步队列中,没有进入上面的while循环,或者是在上面的循环中阻塞后被唤醒之后会进入这里 // 尝试获取锁的所有权或者在AQS队列中等待,具体代码分析见上一篇文章 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 如果node的nextWaiter不为null的时候会清理掉等待队列中的cancelled状态的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
接着来看java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#checkInterruptWhileWaiting方法:
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
// 在取消等待后,如有必要,将节点迁移到AQS队列中。 final boolean transferAfterCancelledWait(Node node) { // cas 操作成功时则将节点入列,然后返回true if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { // 入队列 enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ // 如果我们没有被signal()方法唤醒,那么直到它完成enq()之前我们无法继续进行。在不完整的传输过程中取消是罕见且短暂的,因此只需自旋。直到node被加入到了sync队列中(也就是AQS队列),或许下一个signal()方法可以做到这一点 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
这个方法主要用来在await方法中线程被唤醒之后判断是否是因为有interrupted事件导致的等待中止。如果是因为中断事件导致的等待中止,则需要在取消等待后尝试将节点迁移到AQS队列中。有一个罕见情况是,如果迁移不成功,判断到节点仍然不在AQS的sync队列中,则让当前线程自旋,直到下一个signal方法或其他方式将节点迁移到队列中之后停止。
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signalAll() { // 线程是否拥有独占锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { // 将当前condition的condition队列的firstWaiter和lastWaiter都置为null lastWaiter = firstWaiter = null; do { // 从头节开始,将condition队列中的所有节点都尝试迁移到AQS的sync队列中去 Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
signalAll方法的操作与signal方法的操作一样,只是一个调用的是doSignalAll方法,另一个调用的是doSignal方法。与doSignal方法只尝试将等待时间最长的节点唤醒。而doSignalAll是会尝试唤醒所有的condition队列中的节点。
ReentrantLock reentrantLock = new ReentrantLock(true); Condition condition = reentrantLock.newCondition(); new Thread(new Runnable() { @Override public void run() { reentrantLock.lock(); System.out.println("===================wait start==========="); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("=====================wait end=============="); reentrantLock.unlock(); } }).start();
new Thread(new Runnable() { @Override public void run() { reentrantLock.lock(); System.out.println("===================signal start==========="); condition.signal(); System.out.println("=====================signal end=============="); reentrantLock.unlock(); } }).start();
可以debug下源码进行分析。