前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS --- 融会贯通

AQS --- 融会贯通

作者头像
贪挽懒月
发布2021-07-23 16:16:37
2990
发布2021-07-23 16:16:37
举报
文章被收录于专栏:JavaEEJavaEE

一、ReentrantLock 加锁过程简介

加锁可以分为三个阶段:

  • 尝试加锁;
  • 加锁失败,线程入AQS队列;
  • 线程入队列后进入阻塞状态。

二、ReentrantLock 加锁源码分析

现有如下场景:

三个人去银行的一个窗口办理业务,一个窗口同一时刻只能接待一位顾客。抽象成代码就是:

代码语言:javascript
复制
public static void main(String[] args){
    ReentrantLock lock = new ReentrantLock();
    new Thread(() -> {
        lock.lock();
        try {
            System.out.println("顾客A办理业务");
            TimeUnit.MINUTES.sleep(5);
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }, "A").start();

    new Thread(() -> {
        lock.lock();
        try {
            System.out.println("顾客B办理业务");
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }, "B").start();

    new Thread(() -> {
        lock.lock();
        try {
            System.out.println("顾客C办理业务");
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }, "C").start();
}

1. lock 方法:

调用lock.lock()的时候,实际上调用的是 NonfairSync 的 lock 方法,如下:

代码语言:javascript
复制
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

compareAndSetState(0, 1) 这里比较并交换,期望 AQS 中的 state 的值是0,如果是,就将其修改成1,返回 true ,否则返回 false 。

  • 返回 true 的时候,执行 setExclusiveOwnerThread(Thread.currentThread()) 把占用资源的线程设置成当前线程。当线程A进来的时候,因为 AQS 中的 state 还没别的线程去修改,所以是0,就会成功修改成 1,就直接加锁成功了。
  • 返回 false 的时候,就会进入 esle 块中,执行 acquire(1) 方法。假如线程A加锁成功还没释放锁的时候,线程B进来了,那么就会返回 false 。 acquire(1) 方法又主要包括三个方法,源码如下:
代码语言:javascript
复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

接下来依次来看这三个方法。

2. tryAcquire(arg) 方法:

代码语言:javascript
复制
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

再点进去看 nonfairTryAcquire(acquires)

代码语言:javascript
复制
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

线程B进到这段代码就有三种情况:

  • currentThread 就是线程B,c 就是 AQS 中的 state,即1,c 不等于0,所以跑到 else if 中,判断当前线程和持有锁的线程是否相同。current 是 B,而当前持有锁的是 A,也不相等,所以直接 return false。
  • 如果线程B进来的时候 A 刚好走了,即 c 等于0,那么进到 if 里面。if 里面做的事就是再将 state 改成1,同时设置当前占有锁的线程为 B,然后返回 true;
  • 如果当前线程等于当前占有锁的线程,即进来的还是线程A,那么就修改 state 的值(当前值加1),然后返回 true。

3. addWaiter(Node.EXCLUSIVE) 方法:

上面说了 tryAcquire(arg) 方法,当该方法返回 false 的时候,就会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法,那么先看它里面的 addWaiter(Node.EXCLUSIVE) 方法,源码如下:

代码语言:javascript
复制
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

无特殊说明的时候,以下情况基于:当前持有锁的是线程A,并且还没释放,进来的是线程B。

这个方法首先将线程B封装成 Node,传进来的 mode 是 AQS 中的一个属性,还没哪里赋过值,所以是 null,当前的 tail 其实也是 null,因为 AQS 队列中现在还没别的等待线程。是 null 的话,就执行入队方法 enq(node),该方法如下:

代码语言:javascript
复制
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

for (; ;) 其实就是相当于 while(true),进行自旋。当前的 tail 是 null ,所以进入 if 中,这里有个 compareAndSetHead(new Node()) 方法,这里是 new 了一个节点,姑且叫它傀儡节点,将它设置为头结点,如果 new 成功了,尾结点也指向它。效果如下图:

傀儡节点

第二次循环的时候,t 就是不再是 null 了,而是指向刚才那个傀儡节点了,所以进入 else 中。else 中做的事就是,将传进来的节点,即封装了线程B的节点 node,将其 prev 设置成刚才new 的那个傀儡节点,再将 tail 指向 封装了线程B的 node;再将傀儡节点的 next 指针指向封装了线程B的 node,如下图:

node入队

当线程C进到 addWaiter(Node mode) 方法,此时 tail 不是 null 了,已经指向了 线程B的节点,所以进入到 if 块里面,执行:

代码语言:javascript
复制
if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
    }
}

这里就是将线程C的 prev 设置为当前的 tail,即线程B的 node,然后将线程C设置为 tail,再将线程B的 next 设置为线程C。最后效果图如下:

线程C入队

4. acquireQueued(final Node node, int arg) 方法:

再来看看这个方法:

代码语言:javascript
复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这个固定的,假如传进来的 node 是线程B,首先会进入自旋,看看 predecessor 这方法:

代码语言:javascript
复制
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}

首先让 p 等于 prev,此时线程B 节点的 prev是谁呢,直接看我上面画的图可以知道,线程B的 prev 就是傀儡节点,所以这里 return 的就是傀儡节点。

回到外层,傀儡节点等于 head,所以又会执行 tryAcquire(arg),即又重复上述步骤再次尝试获取锁。因为此时线程A持有锁,所以线程B尝试获取锁会失败,即 tryAcquire(arg) 会返回 false。

返回 false, 那就执行下一个 if。首先看 shouldParkAfterFailedAcquire(p, node) 方法:

代码语言:javascript
复制
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

prev 是傀儡节点,它的 waitStatus 是0,因为傀儡节点 new 出来以后还没改过它的 waitStatus 的值,默认是0。Node.SIGNAL 的值是 -1,不相等,0 也不大于 0,所以进入 else,执行 compareAndSetWaitStatus(pred, ws, Node.SIGNAL),这是比较并交换,将傀儡节点的 waitStatus 的值由 0 改为 -1,最后返回了 false。

shouldParkAfterFailedAcquire(p, node) 方法返回了 false,因为自旋,所以又回到 final Node p = node.predecessor(); 这一行。此时 p 节点还是傀儡节点,再去尝试获取锁,如果线程A还是释放,又获取失败了,就会再次执行 shouldParkAfterFailedAcquire(p, node) 方法。

再次执行 shouldParkAfterFailedAcquire(p, node) 的时候,傀儡节点的 waitStatus 就已经是 -1 了,所以直接 return true。

这里 return 了 true,就会执行 parkAndCheckInterrupt() 方法,看看这个方法源码:

代码语言:javascript
复制
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

这里的 this 就是线程B,这里调用了 park 方法,就让线程B 在等着了。线程C进来也一样,执行到这一步,就会调用 park 方法,一直在等着。当线程A释放锁了,就会调用 unpark 方法,线程B和线程C就有一个可以抢到锁了。

5. unlock 方法:

当线程A调用了 unlock 方法,实际上调用的是:

代码语言:javascript
复制
 public void unlock() {
    sync.release(1);
}

点进去之后是这样的:

代码语言:javascript
复制
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

再点进去看 tryRelease(arg 方法:

代码语言:javascript
复制
 protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

线程A getState 是 1,传进来的 releases 也是 1,所以相减结果就是 0。因为结果是 0,所以会将 free 改成 true,然后调用 setExclusiveOwnerThread(null); 将当前持有锁的线程设置为 null。然后设置 AQS 的 state 等于 c,即等于 0,最后该方法 return true。

回到上一层,此时的 head 是傀儡节点,不为空,并且傀儡节点的 waitStatus 刚才改成了 -1,不等于 0,所以会调用 unparkSuccessor(h); 方法:

代码语言:javascript
复制
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

这里传进来的节点是傀儡节点,它的 waitStatus 是 -1,小于 0,所以就会执行 compareAndSetWaitStatus(node, ws, 0) 进行比较并交换,又将傀儡节点的 waitStatus 改成了 0。

继续往下执行,得到的 s 就是线程B所在节点,不为空,并且线程B节点的 waitStatus 还没改过,是 0,所以直接执行 LockSupport.unpark(s.thread)

因为调用了 unpark,所以刚才阻塞的线程B在 acquireQueued(final Node node, int arg) 方法中的自旋就继续进行,就会调用 tryAcquire(arg) 方法,这次因为A已经释放锁了,所以该方法会返回 true,就会执行 if 里面的代码:

代码语言:javascript
复制
if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}

看看 setHead(node) 方法:

代码语言:javascript
复制
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

这里的 node 就是线程B节点,将头结点指向线程B节点,将线程B节点的线程设置为空,前驱设置为空。外层再把傀儡节点的 next 指针设置为空,所以最终效果就是:

傀儡节点出队

最终是傀儡节点出队,以前线程B所在节点成为新的傀儡节点。因为之前的傀儡节点已经没有任何引用指向它了,就会被 GC 回收。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、ReentrantLock 加锁过程简介
  • 二、ReentrantLock 加锁源码分析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档