前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS源码分析二之Condition

AQS源码分析二之Condition

作者头像
山行AI
发布2020-03-25 18:07:46
5480
发布2020-03-25 18:07:46
举报
文章被收录于专栏:山行AI山行AI

Condition的使用

创建
代码语言:javascript
复制
 ReentrantLock reentrantLock = new ReentrantLock(true); Condition condition = reentrantLock.newCondition();

reentrantLock.newCondition()方法返回的对象类型是ConditionObject类型,ConditionObject是AbstractQueuedSynchronizer的内部类,它对象的创建依赖于外部类的对象,在它里面可以调用外部类中的方法。

等待
代码语言:javascript
复制
  try{            reentrantLock.lock();            condition.await();        } catch (InterruptedException e) {            e.printStackTrace();        }finally {            if(reentrantLock.isLocked()){                reentrantLock.unlock();            }        }

还有的等待方法有:        condition.await(5, TimeUnit.SECONDS);        condition.awaitNanos(10000);        condition.awaitUninterruptibly();        condition.awaitUntil(new Date())
唤醒
代码语言:javascript
复制
 condition.signal();        condition.signalAll();

Condition实现类ConditionObject

代码语言:javascript
复制
 public class ConditionObject implements Condition, java.io.Serializable {        private static final long serialVersionUID = 1173984872572414699L;        /** First node of condition queue. */        private transient Node firstWaiter;        /** Last node of condition queue. */        private transient Node lastWaiter;
  • firstWaiter:指向condition队列的第一个节点;
  • lastWaiter:指向condition队列的最后一个节点。

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()方法:

代码语言:javascript
复制
public final void await() throws InterruptedException {            // 如果线程中断了,则抛出异常            if (Thread.interrupted())                throw new InterruptedException();            // 将当前waitStatus为condition状态的节点添加到condition的waiter队列中(将会在其中clean掉已经处于canceled状态的节点            Node node = addConditionWaiter();            // 获取到当前aqs的state(当前线程对锁的所有占有权),释放掉当前线程对锁的所有占有权,并调用aqs的release方法唤醒当前aqs头节点的继承节点(如果有的话)            int savedState = fullyRelease(node);            int interruptMode = 0;            // 如果当前节点不在同步队列中            while (!isOnSyncQueue(node)) {                // 阻塞当前线程,进入等待状态                LockSupport.park(this);                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                interruptMode = REINTERRUPT;            if (node.nextWaiter != null) // clean up if cancelled                unlinkCancelledWaiters();            if (interruptMode != 0)                reportInterruptAfterWait(interruptMode);        }

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter:

代码语言:javascript
复制
/**         * Adds a new waiter to wait queue.         * @return its new wait node         */        private Node addConditionWaiter() {            // 如果当前线程不是锁的独占线程,则抛出异常            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            Node t = lastWaiter;            // If lastWaiter is cancelled, clean out.            if (t != null && t.waitStatus != Node.CONDITION) {                // 去掉已经处于canceled状态的waiters,在清理时可能会改变lastWaiter的指向                unlinkCancelledWaiters();                // 重新将可能更新过的lastWaiter赋值给t                t = lastWaiter;            }            // 新创建一个waitStatus状态为condition的节点            Node node = new Node(Node.CONDITION);            if (t == null)                // 如果t为null,则代表当前等待队列为空,firstWaiter就是当前节点                firstWaiter = node;            else                // t指向的是lastWaiter,所以这里将当前节点放在队列的最尾部                t.nextWaiter = node;            // 将当前ConditionObject的lastWaiter指向node            lastWaiter = node;            return node;        }
  • isHeldExclusively方法调用的是ConditionObject外部类AQS对象中的方法,我们看下ReentrantLock类中的isHeldExclusively方法:
代码语言:javascript
复制
   protected final boolean isHeldExclusively() {            // While we must in general read state before owner,            // we don't need to do so to check if current thread is owner            return getExclusiveOwnerThread() == Thread.currentThread();        }

该方法的作用主要是判断当前锁的独占线程是不是当前线程。

java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease:

代码语言:javascript
复制
 /**     * Invokes release with current state value; returns saved state.     * Cancels node and throws exception on failure.     * @param node the condition node for this wait     * @return previous sync state     */    final int fullyRelease(Node node) {        try {            // 这里调用的是AQS的getState方法            int savedState = getState();            // 调用的是AQS的release方法,如果成功则会释放当前线程所占有的锁并唤醒头节点的继任节点(如果存在的话),当然在 release方法内部又会调用tryRelease方法,该方法会根据公平锁和非公平锁有不同的实现(这个在上一篇中已经分析过。            if (release(savedState))                return savedState;            // 如果release失败,则会抛出异常,然后在catch块中将节点waitStatus修改为canceled状态            throw new IllegalMonitorStateException();        } catch (Throwable t) {            // 出现了异常之后将节点的waitStatus置为canceled状态            node.waitStatus = Node.CANCELLED;            throw t;        }    }

java.util.concurrent.locks.AbstractQueuedSynchronizer#isOnSyncQueue:

代码语言:javascript
复制
    // 当一个节点以初始状态被放入了一个condition队列中,现在正处于AQS同步队列中等待重新获取锁时返回true    final boolean isOnSyncQueue(Node node) {        // condition节点的初始waitStatus为CONDITION        // 当节点等待状态为condition或者节点的前置节点为null时返回false        if (node.waitStatus == Node.CONDITION || node.prev == null)            return false;        // 如果节点拥有后续节点,则表明该节点已经在AQS的同步队列中了        if (node.next != null) // If has successor, it must be on queue            return true;        //节点的前置节点不为空但是可能还没有在队列中的原因是通过cas将它添加到队列中的操作可能会失败。所以我们必须反转遍历(从尾节点开始遍历)来确保它实际上完成了入队。在此方法的调用中,它将始终靠近尾部,并且除非CAS失败(这不太可能),否则它将是在离尾部很近的地方,所以我们几乎没有遍历        return findNodeFromTail(node);    }
    // 当从后面的tail节点往前搜索时节点在同步队列中时返回true;会在isOnSyncQueue方法有需要的时候进行调用    private boolean findNodeFromTail(Node node) {        // We check for node first, since it's likely to be at or near tail.        // tail is known to be non-null, so we could re-order to "save"        // one null check, but we leave it this way to help the VM.        for (Node p = tail;;) {            // 先校验当前节点,因为它可能就在tail节点附近。找到了就表明在已经在同步队列中了            if (p == node)                return true;            // tail 是不能为空的,所以当遍历到空时代表需要重新排序来保存            if (p == null)                return false;            // 从后向前查找            p = p.prev;        }    }

这个isOnSyncQueue主要用来判断当前节点是否在同步队列中,新添加的condition节点是不会在同步队列中的,它只位于condition队列中。

这里有必要区分下condition的等待队列和AQS的同步队列,因为可以不止一次的调用lock.newCondition方法,这说明AQS中不止维护了一个等待队列。object监视器上只能拥有一个同步队列和一个等待队列,而AQS却拥有一个同步队列,多个等待队列。具体如下图:

注:图片来源于网络。图片中上面是AQS的同步队列,下面是一个或多个condition队列。

condition wait状态下的中断

代码语言:javascript
复制
    /*         * For interruptible waits, we need to track whether to throw         * InterruptedException, if interrupted while blocked on         * condition, versus reinterrupt current thread, if         * interrupted while blocked waiting to re-acquire.         */
        /** Mode meaning to reinterrupt on exit from wait */        private static final int REINTERRUPT =  1;        /** Mode meaning to throw InterruptedException on exit from wait */        private static final int THROW_IE    = -1;

两种中断模式:

  • REINTERRUPT:wait状态下退出时重新尝试中断;
  • THROW_IE:wait状态下退出时抛出InterruptedException。

唤醒

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal方法
代码语言:javascript
复制
        // 移动等待时间最长的线程,从conditon的等待队列到AQS的同步队列        public final void signal() {            // 如果当前线程没有拥有独占权则抛出异常            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            // 用本地变量保存firstWaiter            Node first = firstWaiter;            if (first != null)                // 执行signal操作                doSignal(first);        }
 /**         * Removes and transfers nodes until hit non-cancelled one or         * null. Split out from signal in part to encourage compilers         * to inline the case of no waiters.         * @param first (non-null) the first node on condition queue         */        // 删除并转移condition队列中的节点到AQS队列中,直到击中不可取消的节点或空值。从signal中分离出来去让编译器在没有waiters的时候进行内联优化        private void doSignal(Node first) {            do {                // 将firstWaiter指向first节点的nextWaiter                if ( (firstWaiter = first.nextWaiter) == null)                    // firstWaiter为null时将lastWaiter也指向null                    lastWaiter = null;                // 将first节点的nextWaiter节点置为null                first.nextWaiter = null;                // 循环着尝试将节点从condition队列移到AQS的同步队列中            } while (!transferForSignal(first) &&                     (first = firstWaiter) != null);// 这里需要注意的是,如果头节点迁移失败,存在firstWaiter时会尝试唤醒这个firstWaiter,循环会继续        }

    // 将一个节点从condition队列转移到AQS同步队列中去。成功时返回true    final boolean transferForSignal(Node node) {        // 如果改变节点状态失败则代表节点状态已经被改变了,会继续doSignal中的循环,如果节点存在下一个waiter,则尝试对节点的下一个waiter进行transferForSignal        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))            return false;
        // 拼接到队列上并尝试设置前任节点的waitStatus以指示线程(可能)正在等待。如果取消或者尝试设置        // waitStatus失败,唤醒线程去重新同步(在这种情况下,waitStatus可能是出现了短暂而无害的错误)        Node p = enq(node); // 将node节点入队并返回node节点的前置节点即之前AQS的前置节点        int ws = p.waitStatus;        // ws>0代表p节点处于canceled状态,即已经被取消        // 尝试设置p节点为SIGNAL状态,设置失败时也会直接唤醒当前线程        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))            // 唤醒当前线程            LockSupport.unpark(node.thread);        return true;    }

doSignal主要做了以下几件事:1、将头节点从等待队列移除(如果头节点迁移失败,存在firstWaiter时会尝试唤醒这个firstWaiter,循环会继续);2、尝试将头节点状态由CONDITION改为0,即初始状态,如果失败并且节点存在下一个waiter,则尝试对节点的下一个waiter进行transferForSignal;3、将节点从同步队列尾部插入;4、在迁移的过程中如果遇到前置节点处于canceled状态或者waitStatus执行CAS将前置节点的状态改为Node.SIGNAL时失败的直接唤醒当前节点线程。这个方法的基础作用是唤醒当前condition的condition队列中等待时间最久的那个线程。

那么,在大多数情况下,竞争比较多时,condition的await方法的线程park状态是在哪里被唤醒的呢? 一个是前置节点出现上面第4种情况时,在transferForSignal方法中被唤醒,一个是在AQS的release方法中唤醒。如await方法中调用的fullyRelease方法。这两种都是很少量情况下的唤醒,注意一下,在await方法中执行 LockSupport.park(this)的是一个while循环,当while循环条件改变时,也就是说将节点从condition的等待队列迁移到AQS的sync队列之后,while条件不再生效,就会继续向下执行。

这时我们再回过头来看之前的await方法的代码:

代码语言:javascript
复制
public final void await() throws InterruptedException {            // 如果线程中断了,则抛出异常            if (Thread.interrupted())                throw new InterruptedException();            // 将当前waitStatus为condition状态的节点添加到condition的waiter队列中(将会在其中clean掉已经处于canceled状态的节点            Node node = addConditionWaiter();            // 获取到当前aqs的state(当前线程对锁的所有占有权),释放掉当前线程对锁的所有占有权,并调用aqs的release方法唤醒当前aqs头节点的继承节点(如果有的话)            int savedState = fullyRelease(node);            int interruptMode = 0;            // 如果当前节点不在同步队列中,则会直接让当前线程进入等待状态,等待signal方法或者AQS的release方法唤醒(节点释放锁时会唤醒后继节点线程),while循环的作用是防止虚假唤醒,这一点在之前有写过专门的文章介绍过。            while (!isOnSyncQueue(node)) {                // 阻塞当前线程,进入等待状态。注意当前节点是在condition队列中的,在调用signal方法后会将condition队列中的节点迁移至AQS队列中,然后在队列中竞争在AQS的锁所有权。                LockSupport.park(this);//注意这里Park的blocker是当前aqs对象                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;            }              // 如果节点不在AQS同步队列中,没有进入上面的while循环,或者是在上面的循环中阻塞后被唤醒之后会进入这里            // 尝试获取锁的所有权或者在AQS队列中等待,具体代码分析见上一篇文章            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                interruptMode = REINTERRUPT;            // 如果node的nextWaiter不为null的时候会清理掉等待队列中的cancelled状态的节点            if (node.nextWaiter != null) // clean up if cancelled                unlinkCancelledWaiters();            if (interruptMode != 0)                reportInterruptAfterWait(interruptMode);        }
  • 阻塞等待的地方主要有两个:1. while (!isOnSyncQueue(node))循环中的LockSupport.park(this);2. acquireQueued的时候如果不能获取到锁也会进行LockSupport.park操作。
  • while循环中包含LockSupport.park操作时,在unpark之前while循环还在不停地判断。
  • 注意一下,在await方法中执行 LockSupport.park(this)的是一个while循环,当while循环条件改变时,也就是说将节点从condition的等待队列迁移到AQS的sync队列之后,while条件不再生效,就会继续向下执行。await方法中的park操作结束,接下来就会进入到acquireQueued中尝试让被唤醒的节点重新来竞争锁的占有权。

接着来看java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#checkInterruptWhileWaiting方法:

代码语言:javascript
复制
 /**         * Checks for interrupt, returning THROW_IE if interrupted         * before signalled, REINTERRUPT if after signalled, or         * 0 if not interrupted.         */        private int checkInterruptWhileWaiting(Node node) {            return Thread.interrupted() ?                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :                0;        }
    // 在取消等待后,如有必要,将节点迁移到AQS队列中。    final boolean transferAfterCancelledWait(Node node) {        // cas 操作成功时则将节点入列,然后返回true        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {            // 入队列            enq(node);            return true;        }        /*         * If we lost out to a signal(), then we can't proceed         * until it finishes its enq().  Cancelling during an         * incomplete transfer is both rare and transient, so just         * spin.         */        // 如果我们没有被signal()方法唤醒,那么直到它完成enq()之前我们无法继续进行。在不完整的传输过程中取消是罕见且短暂的,因此只需自旋。直到node被加入到了sync队列中(也就是AQS队列),或许下一个signal()方法可以做到这一点        while (!isOnSyncQueue(node))            Thread.yield();        return false;    }

这个方法主要用来在await方法中线程被唤醒之后判断是否是因为有interrupted事件导致的等待中止。如果是因为中断事件导致的等待中止,则需要在取消等待后尝试将节点迁移到AQS队列中。有一个罕见情况是,如果迁移不成功,判断到节点仍然不在AQS的sync队列中,则让当前线程自旋,直到下一个signal方法或其他方式将节点迁移到队列中之后停止。

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll

代码语言:javascript
复制
 /**         * Moves all threads from the wait queue for this condition to         * the wait queue for the owning lock.         *         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}         *         returns {@code false}         */        public final void signalAll() {            // 线程是否拥有独占锁            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            Node first = firstWaiter;            if (first != null)                doSignalAll(first);        }  
/**         * Removes and transfers all nodes.         * @param first (non-null) the first node on condition queue         */        private void doSignalAll(Node first) {            // 将当前condition的condition队列的firstWaiter和lastWaiter都置为null            lastWaiter = firstWaiter = null;            do {                // 从头节开始,将condition队列中的所有节点都尝试迁移到AQS的sync队列中去                Node next = first.nextWaiter;                first.nextWaiter = null;                transferForSignal(first);                first = next;            } while (first != null);        }

signalAll方法的操作与signal方法的操作一样,只是一个调用的是doSignalAll方法,另一个调用的是doSignal方法。与doSignal方法只尝试将等待时间最长的节点唤醒。而doSignalAll是会尝试唤醒所有的condition队列中的节点。

总结

  • await方法除了会让当前线程进行等待状态外,还会释放掉当前锁的占有权。await的其他方法无非是在此基础上添加了等待的条件,比如时间和中断等。
  • signalAll方法的操作与signal方法的操作一样,只是一个调用的是doSignalAll方法,另一个调用的是doSignal方法。与doSignal方法只尝试将等待时间最长的节点唤醒。而doSignalAll是会尝试唤醒所有的condition队列中的节点。
  • 在await方法中执行 LockSupport.park(this)的是一个while循环,当while循环条件改变时,也就是说将节点从condition的等待队列迁移到AQS的sync队列之后,while条件不再生效,就会继续向下执行。await方法中的park操作结束,接下来就会进入到acquireQueued中尝试让被唤醒的节点重新来竞争锁的占有权。
  • 示例:
代码语言:javascript
复制
 ReentrantLock reentrantLock = new ReentrantLock(true);        Condition condition = reentrantLock.newCondition();        new Thread(new Runnable() {            @Override            public void run() {                reentrantLock.lock();                System.out.println("===================wait start===========");                try {                    condition.await();                } catch (InterruptedException e) {                    e.printStackTrace();                }                System.out.println("=====================wait end==============");                reentrantLock.unlock();            }        }).start();
        new Thread(new Runnable() {            @Override            public void run() {                reentrantLock.lock();                System.out.println("===================signal start===========");                condition.signal();                System.out.println("=====================signal  end==============");                reentrantLock.unlock();            }        }).start();

可以debug下源码进行分析。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Condition的使用
  • Condition实现类ConditionObject
  • 唤醒
    • java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll
    • 总结
    相关产品与服务
    腾讯云代码分析
    腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档