前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java并发-JUC-AQS-独占模式源码解析

Java并发-JUC-AQS-独占模式源码解析

作者头像
颍川
发布2021-12-06 15:51:43
2140
发布2021-12-06 15:51:43
举报
文章被收录于专栏:颍川颍川

文章目录

代码语言:txt
复制
    - ​
        - [说明](https://cloud.tencent.com/developer)
        - [疑问](https://cloud.tencent.com/developer)
        - ​
            - [为什么需要实现两种不同模式](https://cloud.tencent.com/developer)
            - [什么是独占模式](https://cloud.tencent.com/developer)
        - [概述](https://cloud.tencent.com/developer)
        - [源码分析](https://cloud.tencent.com/developer)
        - [总结](https://cloud.tencent.com/developer)
说明

每个 Java 工程师都应该或多或少地了解 AQS,我已经反复研究了很长时间,忘记了一遍又一遍地看它.每次我都有不同的经历.这一次,我打算重新拿出系统的源代码,并将其总结成一系列文章,以供将来查看.

一般来说,AQS规范是很难理解的,本次准备分五篇文章用来分析AQS框架:

  1. 第一篇(翻译AQS论文,理解AQS实现思路)
  2. 第二篇(介绍AQS基础属性,内部类,抽象方法)
  3. 第三篇(介绍独占模式的代码实现)
  4. 第四篇(介绍共享模式的代码实现)
  5. 第五篇(介绍Condition的相关代码实现)
疑问
为什么需要实现两种不同模式

大师给的解释是,虽然大多数应用程序应最大程度地提高总吞吐量,最大程度地容忍缺乏饥饿的概率。但是,在诸如资源控制之类的应用程序中,保持跨线程访问的公平性,容忍较差的聚合吞吐量更为重要,没有任何框架能够代表用户在这些相互冲突的目标之间做出决定;相反,必须适应不同的公平政策。所以AQS框架提供了两种模式

什么是独占模式

独占模式:每次只能有一个线程能持有资源;

概述

本篇文章为系列文章的第三篇,本篇文章介绍AQS独占模式的代码实现,首先,我们从总体过程入手,了解AQS排他性锁的执行逻辑,然后逐步深入分析了源代码。

获取锁的过程:

  1. acquire ()申请锁资源时,如果成功,它将进入临界区
  2. 当获取锁失败时,它进入一个 FIFO 等待队列并被阻塞,等待唤醒
  3. 当队列中的等待线程被唤醒时,会再次尝试获取锁资源。如果成功,它进入临界区,否则它将继续阻塞,等待唤醒 释放锁过程:
  4. 当线程调用release()来释放锁资源时,如果没有其他线程在等待锁资源,那么释放就完成了。
  5. 如果队列中有其他正在等待锁资源的线程需要被唤醒,则队列中的第一个等待节点(FIFO)将被唤醒。
源码分析

基于上面提到的获取和释放排他锁的一般过程,让我们来看看源代码实现逻辑.首先,让我们看看获取锁的acquire()方法。

代码语言:javascript
复制
 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSICE),arg)){
            selfInterrupt();
 }

虽然代码很短,但是缺包含很多内容,让我们一步一步来看:

  1. 第一步执行的是开发人员自己实现的tryAcquire()方法来尝试获取锁资源,如果成功则执行整个acquire()方法,既当前线程获得锁资源并进入临界值.
  2. 如果获取锁失败,它进入以下逻辑,从addWaiter(Node.EXCLUSIVE)方法开始.看看这个方法的源代码实现:
代码语言:javascript
复制
    /**
     * Creates and enqueues node for current thread and given mode.
     * 为当前线程和给定的模式创建并进入节点队列。
     * @param mode Node.EXCLUSIVE for exclusive,表示独占
     *             Node.SHARED for shared 表示共享
     * @return the new node
     *         返回新节点
     */
    private Node addWaiter(Node mode) {
        //根据当前线程创建新的节点
        //因为是独占模式,所以节点类型是Node.EXCLUSIVE
        Node node = new Node(Thread.currentThread(),mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 为了提高性能,我们首先执行一个快速的队列进入操作,也就是说,我们直接尝试在队列的末尾添加新节点
        // pred!=null表示已经有线程完成了节点初始化
        if (pred!=null) {
            node.prev = pred;
            //根据CAS的逻辑,即使并发操作也只能有一个线程能成功追加到尾节点并返回,
            //其余的线程将执行后续的排队操作.也就是enq()方法
            if (compareAndSetTail(pred,node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    /**
     * Inserts node into queue, initializing if necessary. See picture above
     * 将节点插入队列,必要时进行初始化。
     * @param node – the node to insert
     *             — 待插入的节点
     * @return node's predecessor
     *         前节点
     */
    private Node enq(final Node node){
        for(;;){
            Node t = tail;
            //tail 和 head 在这里初始化是因为AQS设计的是延迟初始化
            //tail 等于null,表示队列还没有初始化
            //如果队列还没有初始化,初始化,创建一个空的头节点
            if (t == null) {
                //类似地addWaiter,CAS只有一个线程可以成功初始化头节点,其余线程必须重复循环
                //初始化 tail head
                if (compareAndSetHead(new Node())){
                    tail = head;
                }
            } else {
                //新创建的前节点指向队列的末尾,在并发的情况下,毫无疑问会有多个新创建的节点的前节点会指向队列的末尾。
                node.prev = t;
                //根据这一步的CAS,无论前一步有多少新节点指向尾部节点,在这一步中只有一个能够真正成功加入设置,
                //其余的必须重新执行循环体
                if (compareAndSetTail(t, node)){
                   //将原tail节点的后节点设置为新tail节点
                   //由于CAS和设置next不是原子操作,因此可能出现更新tail节点成功
                   //但是未执行pred.next = node,导致无法从head遍历节点;
                   //但是由于前面已经设置了prev属性,因此可以从尾部遍历;
                   //像getSharedQueuedThreads、getExclusiveQueuedThreads都是从尾部开始遍历
                    t.next = node;
                    //循环的唯一退出操作是成功设置tail节点
                    return t;
                }
            }
        }
    }

以上入口操作要说明两点:首先,初始化队列的触发条件是线程当前占用锁资源,所以上面创建的空头节点可以看作是当前占用锁资源的节点(尽管它没有设置任何属性)。请注意,整个代码处于一个死循环中,直到设置成功.如果他们失败了,他们会一次又一次地尝试。

完成上述操作后,我们申请获取锁的线程成功加入了等待队列。通过本文开头提到的独占锁获取过程,节点现在需要做的是挂起当前线程并等待唤醒。这一逻辑如何实现?看看源代码:

代码语言:javascript
复制
    //通过以上分析,node是刚刚进入包含当前线程信息的队列的节点
    final boolean acquireQueued(final Node node, int arg) {
        //获取锁定资源失败标记位
        boolean failed = true;
        try {
            //正在等待线程的中断标记位
            boolean interrupted = false;
            //该循环体的执行时序包括两种:新节点队列和等待节点唤醒的队列
            for (;;) {
                //获取当前节点的前节点
                final Node p = node.predecessor();
                //如果前端节点是头节点,尝试获取锁资源
                if (p == head && tryAcquire(arg)) {
                    //当前节点获取锁资源后,设置为Header节点
                    //Header节点表示当前占用锁资源的节点
                    setHead(node);
                    p.next = null; // help GC
                    //表示成功获取锁资源,因此failed被设为false
                    failed = false;
                    //返回一个中断标志,用来表示当前节点是被正常唤醒还是被中断唤醒
                    return interrupted;
                }
                //如果锁不成功,则进入挂起逻辑
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //最后,获取锁故障处理逻辑。
            if (failed)
                cancelAcquire(node);
        }
    }

挂起逻辑是一种非常重要的逻辑。我们单独分析一下。首先,我们应该注意到这样一个事实:到目前为止,我们只基于当前线程和节点类型创建了一个节点,并加入了队列。其他属性是默认值

代码语言:javascript
复制
    //node是当前线程的节点,pred是它的pre-node参数。
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取当前节点前节点的等待状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
             ///如果前节点的等待状态为Node.SIGNAL,它返回true,
             //然后执行parkAndCheckInterrupt()方法挂起它。
            return true;
        if (ws > 0) {
            //waitStatus中有几个值表示前节点被取消
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //这里我们从当前节点的前节点开始,循环查找没有被取消的节点(其实就是从队列删除取消状态的节点)
            ///注意,由于Header节点是由new Node()创建的,waitStatus为0,
            //所以这里不会出现空指针的问题,也就是说最多循环到Header节点退出。
            pred.next = node;
        } else {
            //根据waitStatus的取值限制,waitStatus只能为0或Propagate,
            //我们将前节点的waitStatus设置为Node.SIGNAL,重新进入判断方法
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;//返回false,表示不需要调用park
    }

上述方法的逻辑更为复杂。用于判断当前节点是否可以挂起,即是否满足唤醒条件,如果挂起,则必须由其他线程唤醒。如果该方法返回false,表示挂起条件未完成,则会重新执行acquireQueued方法的循环体,重新进行判断.如果返回tru,则意味着一切就绪,可以挂起,它将进入parkAndCheckInterrupt()方法,一起来看下源代码吧:

代码语言:javascript
复制
    private final boolean parkAndCheckInterrupt() {
        //阻塞当前线程
        LockSupport.park(this); 
        //唤醒后,返回中断标志,也就是说,如果唤醒是正常的,则返回false,如果唤醒是中断的,则返回true。
        return Thread.interrupted();
    }

查看acquireQueued方法中的源代码。如果中断被唤醒,将interrupt标志设置为true。无论是正常唤醒还是从中断中唤醒,您都尝试获取锁定资源。如果成功,则返回中断标志,否则将暂停等待。

注意:Thread.interrupted()方法在返回中断标志时清除中断标志,也就是说,当中断被唤醒并且锁成功时,整个quireQueued方法返回TRUE以指示中断被唤醒,但是如果中断被唤醒并且锁没有被获取,则它继续挂起,因为中断已经被清除,并且如果下一次是正常的。

唤醒,quireQueued方法返回false,表示没有中断。

最后,我们返回到acquireQueued方法的最后一步,即finally模块.这是在锁定资源获取失败之后完成的一些后续工作, 看上面的代码,实际上可以在这里输入的是tryAcquire()方法引发异常. 也就是说,如果AQS框架为开发人员自己实现的锁获取操作抛出异常,它也会进行适当的处理.让我们一起看一下源代码:

代码语言:javascript
复制
    //传入方法参数为当前获取锁资源失败的节点
    private void cancelAcquire(Node node) {
        //如果节点不存在,直接忽略
        if (node == null)
            return;

        node.thread = null;

        //跳过所有取消的前置节点,类似上面的跳转逻辑
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        //这是前面节点的后继节点.由于上面跳转节点的操作,可能不是这里的当前节点.仔细想想。
        Node predNext = pred.next;

        //将当前节点的waitStatus设置为CANCELLED,以便其他节点在处理时将跳过该节点
        node.waitStatus = Node.CANCELLED;

        //如果直接删除当前的尾部节点,即从队列中删除
        //注意:这里不需要关心CAS失败,因为即使并发失败,节点也已经成功删除。
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    //这里的判断逻辑非常模糊,特别是当前节点的前端节点不是头节点,
                    //后面的节点等待唤醒(waitStatus小于0)
                    //另外,如果不取消当前节点的后继节点,则前一个节点与后一个节点连接,相当于删除当前节点。
                    compareAndSetNext(pred, predNext, next);
            } else {
                //在这里输入,或者当前节点的前节点是头节点,或者前面节点的waitStatus是PROPAGATE,它直接唤醒当前节点的后续节点。
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

上面分析的是不可中断获取锁的实现,但是AQS还提供了可中断,可定时的获取锁操作,我们也一起看下吧

代码语言:javascript
复制
   //可中断获取锁
   public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //区别是 如果响应中断,则直接抛出异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

有上面代码可以看出,可中断获取锁和不可中断的区别就是,是标记中断,还是直接抛出异常

下面在来看下可定时获取锁源码:

代码语言:javascript
复制
   //尝试以独占模式获取,如果中断则中止,如果超过给定超时则失败。。
   public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
        private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (nanosTimeout <= 0L) {
            return false;
        }
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSICE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return true;
                }
                //如果时间已经小于0,则直接返回false 放弃获取锁
                nanosTimeout = deadline -System.nanoTime();
                if (nanosTimeout <= 0L) {
                    return false;
                }
                //如果锁不成功,同时当前时间剩余时间大于spinForTimeoutThreshold,则阻塞nanosTimeout时间
                if (shouldParkAfterFailedAcquire(p,node) && nanosTimeout > spinForTimeoutThreshold) {
                    LockSupport.parkNanos(this,nanosTimeout);
                }

                if (Thread.interrupted()){
                    throw new InterruptedException();
                }
            }
        } finally {
            if (failed) {
                cancelAcquire(node);
            }
        }
    }

以上是exclusive mode acquisition lock(独占模式获取锁)的核心源代码,这真的是非常难理解,这些方法需要重复很多次,才能慢慢理解。

接下来,看看释放锁的过程:

代码语言:javascript
复制
 public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

TryRelease()方法是用户定义的释放锁逻辑。如果成功,则判断等待队列中是否有需要唤醒的节点(waitStatus 0表示没有需要唤醒的节点)。

一起看下唤醒操作的实现吧:

代码语言:javascript
复制
  private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            //将标记设置为0表示唤醒操作已经启动,并提高了并发环境中的性能
            compareAndSetWaitStatus(node, ws, 0);

        //如果当前节点的后继节点为null,或者已经被取消
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //请注意,循环不会中断,也就是说,它会来回查找,直到找到最近的节点,等待从当前节点唤醒。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //进行唤醒操作
        if (s != null)
            LockSupport.unpark(s.thread);
    }

相比之下,释放锁要简单得多,代码也少得多。

总结

以上是AQS排他锁的获取和释放过程。一般的想法很简单:尝试获取锁,并在失败时加入队列挂起。当锁被释放时,如果队列中有等待的线程,锁就会被唤醒。但如果你一步一步跟踪源代码,你会发现有很多细节,很多地方很难理解,我已经学了一遍又一遍,但我不敢说,我研究了AQS,甚至不敢说,上述研究结果是正确的,只是写一篇总结,与同行交流经验。除了排他锁之外,稍后还将推出AQS系列文章,包括共享锁、条件队列的实现原理等等。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/04/06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
    • 说明
      • 疑问
        • 为什么需要实现两种不同模式
        • 什么是独占模式
      • 概述
        • 源码分析
          • 总结
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档