前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁

AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁

作者头像
用户3147702
发布2022-06-27 12:40:06
7620
发布2022-06-27 12:40:06
举报
文章被收录于专栏:小脑斧科技博客

1. 概述

本文,我们来介绍 java 中线程同步的另一个基础工具类 — AQS。

AQS (Abstract Queued Synchronizer) 是 JDK 提供的一套基于 FIFO 同步队列的阻塞锁和相关同步器的一个同步框架,通过 AQS 我们可以很容易地实现我们自己需要的独占锁或共享锁。 java 中,我们曾经介绍过的信号量、ReentrantLock、CountDownLatch 等工具都是通过 AQS 来实现的。

2. AQS — Abstract Queued Synchronizer

AQS 又称为队列同步器,java 是通过 AbstractQueuedSynchronizer 类来实现其思想的。 他通过一个 int 类型的成员变量 state 来控制同步状态,state = 0 时,则说明没有任何线程占用锁,当 state = 1 时,则说明有一个线程目前正在占用锁。 它支持实现共享锁与独占锁,下面我们就从源码来剖析,分析一下 AQS 的实现原理。

3. 类图

AbstractQueuedSynchronizer 继承自抽象类 AbstractOwnableSynchronizer,AbstractOwnableSynchronizer 这个类定义了存储独占当前锁的线程和获取的方法。 AbstractQueuedSynchronizer 类通过内部类 Node 构成的 FIFO 同步队列来完成线程获取锁的排队工作。 同时,AbstractQueuedSynchronizer 通过 ConditionObject 来构建等待队列。

4. Node

代码语言:javascript
复制
static final class Node {
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;

    // 标识线程已处于结束状态
    static final int CANCELLED =  1;
    // 等待被唤醒状态
    static final int SIGNAL    = -1;
    // 条件状态
    static final int CONDITION = -2;
    // 在共享模式中使用表示获得的同步状态会被传播
    static final int PROPAGATE = -3;

    // 等待状态, 存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种
    volatile int waitStatus;

    // 同步队列中前驱结点
    volatile Node prev;

    // 同步队列中后继结点
    volatile Node next;

    // 请求锁的线程
    volatile Thread thread;

    // 等待队列中的后继结点
    Node nextWaiter;

    //判断是否为共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //获取前驱结点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    //.....
}

Node 类是一个典型的双向链表元素结构,拥有前驱和后继的引用。 其中 SHARED 和 EXCLUSIVE 常量分别代表共享模式和独占模式,所谓共享模式是一个锁允许多条线程同时操作,如信号量 Semaphore 采用的就是基于 AQS 的共享模式实现的。 而独占模式则是同一个时间段只能有一个线程对共享资源进行操作,多余的请求线程需要排队等待,如 ReentranLock。

AbstractQueuedSynchronizer waitStatus 取值

常量

意义

CANCELLED

1

同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,节点终极状态

SIGNAL

-1

等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行

CONDITION

-2

该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁

PROPAGATE

-3

在共享模式中,该状态标识结点的线程处于可运行状态

0

0

初始状态

5. AbstractQueuedSynchronizer 的模板方法

AbstractQueuedSynchronizer 类作为抽象的基础框架类,通过定义模板方法的方式提供了一套实现锁的模板,其最基本的锁实现方式需要子类复写模板:

代码语言:javascript
复制
protected boolean tryAcquire(int arg);            // 获取独占锁
protected boolean tryRelease(int arg);            // 释放独占锁
protected int tryAcquireShared(int arg);        // 获取共享锁
protected boolean tryReleaseShared(int arg);    // 释放共享锁
protected boolean isHeldExclusively();            // 判断是否持有独占锁

ReentrantLock、Semaphore、CountDownLatch 等工具都是通过复写上述若干方法实现的。

6. 独占模式获取锁操作 — acquire

AbstractQueuedSynchronizer 通过 acquire 方法获取锁。 下图是整个加锁过程的流程图。

通过整个流程,我们看到,独占模式获取锁的原则就是:

  • 获取锁失败的节点插入队尾
  • 队首不存储任何信息
  • 队首的后继非 CANCELLED 状态节点是唯一有权利获取锁的节点

6.1. acquire 方法

代码语言:javascript
复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire 方法调用了我们上面已经提到的需要子类复写的获取独占锁方法 tryAcquire,框架只定义了获取失败后如何处理 — addWaiter。

6.2. 获取锁失败后入队操作 — addWaiter

代码语言:javascript
复制
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

这是一个典型的双向队列节点入队操作,毋庸细说,他通过我们已经讲到过的 CAS 操作实现了尾节点的判断和更新。

代码语言:javascript
复制
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

而兜底方法 enq 有两种可能被调用。 1. 当前队列为空,尚没有任何元素存在 2. 并发环境下,原子操作 compareAndSetTail 失败

因此,enq 方法中承担了两部分工作的处理:

代码语言:javascript
复制
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

可以看到,enq 方法中通过原子操作和循环的方式实现了并发环境下尾节点的添加,之所以这部分逻辑从 addWaiter 中抽离,是因为大部分情况下是不会出现上述两种可能的,将正常业务逻辑与特殊且不常用业务逻辑分离,是值得学习的一种代码组织方式。

6.3. 重新获取锁及更新节点状态 — acquireQueued

如上所述,并发环境下有可能在插入列表之前尚需要等待锁,但在插入列表后,马上又可以获取到锁,因此此时再次获取锁就可以减少不必要的等待。 于是,在上述 addWaiter 方法执行完,又执行了 acquireQueued 方法。

代码语言:javascript
复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋操作
        for (;;) {
            // 获取当前插入节点的前置节点
            final Node p = node.predecessor();
            // 如果当前节点的前置节点是 head 则重新获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果需要线程进入等待,则等待
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

最终,通过 shouldParkAfterFailedAcquire 方法判断是否需要让线程挂起,如果需要挂起,则调用 parkAndCheckInterrupt 方法挂起线程。 线程的挂起最终是通过 Unsafe 的 static native 方法 park 实现的。

6.4. 节点出队 — cancelAcquire

最终,无论是线程获取锁过程中发生异常还是超时唤醒,都需要将当前 node 出队,这就是 cancelAcquire 方法做的事。

代码语言:javascript
复制
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

可以看到,上面这段代码处理了三种情况: 1. node 是队尾节点 2. node 的前驱不是队首节点,node 也不是队尾节点 3. node 的前驱是队首节点

其中,对于情况 1 和情况 2,都进行了 node 的出队,但是第三种情况却没有执行出队方法,这是为什么呢? 原因是当前节点已经被标记为 CANCELLED 状态,那么,在后继节点执行 shouldParkAfterFailedAcquire 方法时,会先让其前驱节点,也就是我们当前的 node 节点出队。

6.5. 唤醒后继节点 — unparkSuccessor

由上所述,当 node 是队首节点的后继时,执行了 unparkSuccessor 方法。 当前节点是队列中唯一等待锁的节点,所以必须让出获取锁的权限,让他的后继去获取锁,这就是 unparkSuccessor 方法做的事情。

代码语言:javascript
复制
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

代码通过循环的方式,找到了队列中 node 节点后面第一个状态不为 CANCELLED 的节点,执行 LockSupport.unpark 唤醒了该节点对应的线程。 LockSupport.unpark 最终调用了 UNSAFE 类的 unpark 方法。

代码语言:javascript
复制
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

那么,被唤醒的线程执行了什么呢?我们要找到该线程是什么时候被挂起的,那就是在上述的 acquireQueued 方法中,我们回看 acquireQueued 方法,被唤醒的线程在执行完 Thread.interrupted() 方法后继续循环,尝试获取锁,从而保证了锁的独占与竞争。 parkAndCheckInterrupt 方法并没有对 Thread.interrupted() 的返回值做任何处理,由此可见,acquire 方法获取锁失败的线程是不能被 interrupt 方法中断的。

7. 可中断独占锁加锁 — acquireInterruptibly

上面提到,acquire 方法获取锁失败的线程是不能被 interrupt 方法中断的,AQS 还提供了另一个方法 acquireInterruptibly 加锁,从而让获取锁失败等待的线程可以被中断。

代码语言:javascript
复制
public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

7.1. 可中断独占锁加锁失败处理 — doAcquireInterruptibly

可以看到,这里获取锁失败后调用了 doAcquireInterruptibly 方法,doAcquireInterruptibly 方法与 acquire 的结构非常像:

代码语言:javascript
复制
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以看到,这里仍然是通过独占模式获取锁,但是在 parkAndCheckInterrupt 返回后,与此前 acquire 方法中继续循环的方式不同,取而代之的,抛出了 InterruptedException 异常,中断线程执行。

8. 独占锁的解锁 — release

独占锁的解锁较为简单,因为加锁成功后,该线程对应的节点已经从同步队列中移除,此处如果解锁成功,只需要唤醒下一个节点去竞争锁即可。

代码语言:javascript
复制
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

这里仍然调用了 unparkSuccessor 方法,通过循环的方式,找到了队列中 node 节点后面第一个状态不为 CANCELLED 的节点,执行 LockSupport.unpark 唤醒线程。

9. 共享锁的加锁 — acquireShared

从图上可以看到,共享锁的加锁主要做了下面三项工作: 1. 当线程调用acquireShared()申请获取锁资源时,如果成功,则进入临界区。 2. 当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。 3. 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。

9.1. 获取锁传递与唤醒 — setHeadAndPropagate

整个流程与独占锁的加锁流程非常类似,最大的区别就是 setHeadAndPropagate 方法,这个方法做的事情就是我们上面提到的:

  • 如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待
代码语言:javascript
复制
//两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; //记录当前头节点
    //设置新的头节点,即把当前获取到锁的节点设置为头节点
    //注:这里是获取到锁之后的操作,不需要并发控制
    setHead(node);
    //这里意思有两种情况是需要执行唤醒操作
    //1.propagate > 0 表示调用方指明了后继节点需要被唤醒
    //2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //如果当前节点的后继节点是共享类型获取没有后继节点,则进行唤醒
        //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

9.2. 唤醒后继节点 — doReleaseShared

doReleaseShared 方法进行了后续节点的唤醒。

代码语言:javascript
复制
private void doReleaseShared() {
    for (;;) {
        //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
        //其实就是唤醒上面新获取到共享锁的节点的后继节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //表示后继节点需要被唤醒
            if (ws == Node.SIGNAL) {
                //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;      
                //执行唤醒操作      
                unparkSuccessor(h);
            }
            //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        //如果头结点没有发生变化,表示设置完成,退出循环
        //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
        if (h == head)                   
            break;
    }
}

10. 共享锁的解锁 — releaseShard

我们再来看共享锁的解锁操作。

代码语言:javascript
复制
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

结合上面我们已经介绍过的 doReleaseShared 方法源码,共享锁的解锁非常简单,与独占模式的解锁一样,他在解锁成功以后进行了后续节点的唤醒操作,从而保证锁的竞争。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-08-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. AQS — Abstract Queued Synchronizer
  • 3. 类图
  • 4. Node
  • 5. AbstractQueuedSynchronizer 的模板方法
  • 6. 独占模式获取锁操作 — acquire
    • 6.1. acquire 方法
      • 6.2. 获取锁失败后入队操作 — addWaiter
        • 6.3. 重新获取锁及更新节点状态 — acquireQueued
          • 6.4. 节点出队 — cancelAcquire
            • 6.5. 唤醒后继节点 — unparkSuccessor
            • 7. 可中断独占锁加锁 — acquireInterruptibly
              • 7.1. 可中断独占锁加锁失败处理 — doAcquireInterruptibly
              • 8. 独占锁的解锁 — release
              • 9. 共享锁的加锁 — acquireShared
                • 9.1. 获取锁传递与唤醒 — setHeadAndPropagate
                  • 9.2. 唤醒后继节点 — doReleaseShared
                  • 10. 共享锁的解锁 — releaseShard
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档