前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS之共享锁

AQS之共享锁

作者头像
spilledyear
发布2020-01-02 23:40:09
6150
发布2020-01-02 23:40:09
举报
文章被收录于专栏:小白鼠小白鼠

通过 AQS独占锁,我们对AQS的数据结构有了基本的了解。它本质上就是一个优化过的CLH队列,因为CLF队列只有一个前驱指针,而AQS除了前驱指针,还有一个后驱指针。先简单总结一个AQS的特性

  1. 双向链表,有头节点和尾节点,FIFO,尾进头出,每个线程会被封装成一个Node
  2. 状态state,被volatile关键字修饰
  3. 独占模式下:获取锁后state值加1,释放锁后state值减1,通过CAS原子操作加减,state==0表可以获取锁,state>1代表锁重入
  4. 共享模式下,state>0代表可以获取锁,同步器初始化的时候,会给sate设置一个初始化,这个值代表同时允许多少个线程获取锁
  5. 共享模式下, tryAcquireShared返回值的特点是:小于0代表获取锁失败;等于0代表本次获取锁成功,但随后的获取将返回失败,也就是此刻这是共享模式下的最后一把锁,除非接下来有人释放锁,否则你获取不了;大于0代表本次获取锁成,并且接下来也可以获取锁
  6. 每个Node内部有一个nextWaiter属性,表示该节点是独占模式还是共享模式,独占为EXCLUSIVE,独占为SHARED
  7. 每个Node内部有一个waitStatus属性, 这个字段的取值有以下可能: SIGNAL(阻塞) CANCELLED(取消排队) CONDITION(条件等待) PROPAGATE(共享模式下用到) 0(如果没有给它设置状态,默认为0)
  8. 独占模式下,当获取锁成功时,该线程对应的节点(head的下一个节点)会升级为head节点,然后将原head从队列中脱离help GC;当获取锁失败时,会向CLH队列尾部添加一个节点,同时通过自旋将其前驱节点的waitStatus属性设置为SIGNAL,然后通过LockSupport将该节点对应的线程阻塞
  9. 独占模式下,当释放锁的时,先获取head节点的下一个节点,如果不为null,则通过LockSupport将该节点对应的线程解除阻塞;如果为null,则通过前驱指针反向遍历找到该节点,然后通过LockSupport将该节点对应的线程解除阻塞。为什么要通过前驱指针遍历呢?因为AQS的后驱指针在极限情况下是不可靠的,但很多时候可以通过后置指针达到优化的效果:添加节点的时候,当CAS成功但在设置后置指针之前,此时后置指针为null;给前驱节点设置SIGNAL状态的时候,会保证其前驱节点是一个有效的节点(非取消状态),如果为取消状态,则找其前驱的前驱
  10. 共享模式下,当一个线程获得锁的时候,会调用setHeadAndPropagate方法,如果此时同步器中还有可用的锁,则会调用doReleaseShared方法唤醒下一个节点,这就是传播
  11. 共享模式下, 当一个线程释放锁的时候,会调用doReleaseShared方法,该方法会唤醒head节点的下一个节点,而唤醒的节点在通过自旋获得锁后,会调用setHeadAndPropagate方法,如果此时同步器中还有可用的锁,则会继续调用doReleaseShared方法唤醒下一个节点

下面以CountDownLatch讲解AQS共享锁

使用示例

代码语言:javascript
复制
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
    sleep(TimeUnit.MILLISECONDS, 80);
    System.out.println(Thread.currentThread().getName() + " Finished");
    countDownLatch.countDown();
}).start();
new Thread(() -> {
    sleep(TimeUnit.MILLISECONDS, 50);
    System.out.println(Thread.currentThread().getName() + " Finished");
    countDownLatch.countDown();
}).start();
new Thread(() -> {
    sleep(TimeUnit.MILLISECONDS, 60);
    System.out.println(Thread.currentThread().getName() + " Finished");
    countDownLatch.countDown();
}).start();

countDownLatch.await();
System.out.println("All Finished");
}


--------------------------------------------结果---------------------------------------------------
Thread-1 Finished
Thread-2 Finished
Thread-0 Finished
All Finished

CountDownLatch的用法:比如将一个任务分成3个小任务,然后在主线程上等待所有任务完成,这时可以使用CountDownLatch。在构造函数中传入3,在AQS共享模式下state == 3代表同时可以有3个线程获取锁,在CountDownLatch代表有三个线程调用CountDownLatch#countDown方法后,调用CountDownLatch.await()的线程将不再阻塞。在上面例子中,每个小任务完成时调用CountDownLatch#countDown方法,然后在主线程上调用CountDownLatch#countDown,这样就达到我们想要的效果了。

CountDownLatch

代码语言:javascript
复制
CountDownLatch countDownLatch = new CountDownLatch(3);

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
    setState(count);
}
  1. 和其他的同步器类似,在CountDownLatch中有一个内部类Sync,它继承了AbstractQueuedSynchronizer抽象类,它是实现同步器的关键
  2. CountDownLatch中,会根据我们传入的count值,调用AbstractQueuedSynchronizer#setState方法,即最终AQS中state == count

CountDownLatch#await

代码语言:javascript
复制
// CountDownLatch#await
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

和之前的独占模式一样,还是模板模式,也就是说,想实现一个基于共享锁的同步器,只需要重写tryAcquireSharedtryReleaseShared方法。

  1. 根据acquireSharedInterruptibly方法名可以知道,这是一个可以响应中断的方法,如果线程发生中断,则抛出InterruptedException
  2. tryAcquireShared方法的主要作用就是当state==0时返回1,否则返回-1,我们知道CountDownLatch构造函数执行完成之后,AQS中state的值为3,那state的值什么情况下会变为0呢?其实不难猜出应该是在调用CountDownLatch#countDown方法时会改变state的值,这一块内容我们接下来再去验证。也就是说,在没有其他处理的情况下,此时tryAcquireShared方法会返回-1
  3. acquireSharedInterruptibly方法中的if条件成立,所以接下来会执行doAcquireSharedInterruptibly方法

AbstractQueuedSynchronizer#doAcquireSharedInterruptibly

代码语言:javascript
复制
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 创建共享节点,注意这里的 Node.SHARED,然后将其添加到队列尾部
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 自旋获取锁
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点时head,则直接获取锁
            if (p == head) {
                // 尝试获取锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 尝试获取锁成功,需要重设设置头节点,这里面还有传播操作,等下重点关注
                    setHeadAndPropagate(node, r);

                    // 将之前的头节点从队列中删除
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }

            // 设置前驱节点的状态为 SIGNAL 并且阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 与独占模式相同点:先创建节点添加到尾部、获取锁失败时修改前驱节点的状态为SIGNAL并且阻塞当前线程
  2. 与独占模式相同点:独占模式下如果检查到线程发生中断了,仅仅返回一个标识位,这里时直接抛出异常;独占模式下;独占模式下如果获取锁成功,仅仅是更新一下head节点然后返回,而共享模式下是调用了setHeadAndPropagate方法

那么,这个setHeadAndPropagate方法是干嘛用的呢?有必要认真看看

AbstractQueuedSynchronizer#setHeadAndPropagate

代码语言:javascript
复制
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // 设置当前节点为head节点,和独占模式下一样
    setHead(node);

    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
    */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 唤醒下一个节点,
            doReleaseShared();
    }
}

咋一看觉得这段代码是比较难理解的,除了最开始的setHead和独占模式一样,表示设置当前节点为head节点以外,其他的代码看起来有点迷糊。其实这时候我们得从AQS共享模式的一个特点去理解它。

  1. tryAcquireShared返回值特点:小于0代表获取锁失败;等于0代表本次获取锁成功,但随后的获取将返回失败,也就是此刻这是共享模式下的最后一把锁,除非接下来有人释放锁,否则你获取不了;大于0代表本次获取锁成,并且接下来也可以获取锁
  2. propagatetryAcquireShared方法的返回值,如果propagate>0,则说明共享模式下还有锁可以获取,这时候如果队列中有排队的节点,应该通知它们,这就是传播。那怎么通知呢?调用doReleaseShared方法
  3. head==null || head.waitStatus<0又是对应什么场景呢?接下来再说

有关于CountDownLatch#await方法,到这里我们可以放一放,当然还存在一些疑问,先记下来

  1. head==null || head.waitStatus<0对应什么场景?
  2. 为什么要判断两遍呢?即 (h = head) == null || h.waitStatus < 0)
  3. node.next == null || node.next.isShared() 又是什么意思?

CountDownLatch#countDown

其实前面已经猜测过,CountDownLatch#countDown方法应该会改变state的值,同样还是模板模式

代码语言:javascript
复制
// CountDownLatch#countDown
public void countDown() {
    sync.releaseShared(1);
}

// AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// Sync#tryReleaseShared
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
  1. tryReleaseShared表示尝试获取锁,如果获取成功,此时需要通知head节点的后驱节点,并对该后驱节点解除阻塞。Sync#tryReleaseShared方法中向判断state是否等于0,如果是则直接返回false,然后设置state = state-1,然后再返回state是否等于。为什么有这么一段逻辑呢?这其实是和同步器的特性相关,对我在CountDownLatch的构造函数中传入3时,表示我们在3个线程上调用CountDownLatch#countDown方法后,调用CountDownLatch#await的线程将解除阻塞。就是3个,再多几个调用也没有什么效果,所以这里直接返回false
  2. 在3个线程调用CountDownLatch#countDown方法后,AbstractQueuedSynchronizer#releaseShared中的if条件将返回true,此时将调用AbstractQueuedSynchronizer#doReleaseShared方法,表示对head的后驱节点解除阻塞,其实还涉及到一些传播的逻辑

AbstractQueuedSynchronizer#doReleaseShared

代码语言:javascript
复制
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果 head.waitStatus == Node.SIGNAL,说明它的后驱节点正被阻塞,在添加节点的时候会改变前驱节点的状态
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 如果 head.waitStatus == Node.SIGNAL,则唤醒head.next节点,和独占模式一样
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

这一块内容和setHeadAndPropagate方法一起看会更好一些,我把代码也贴上

代码语言:javascript
复制
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
下面开始模拟执行doReleaseShared方法的极限情况

(1). 假设队列中有两个节点A->B->C->D A:head D:tail

(2). threa1代表执行doReleaseShared的线程,thread2代表节点B线程,thread3代表节点C线程,thread4代表节点D线程,最开始执行doReleaseShared方法的时候,thread2 thread3 thread4是被阻塞的

  1. thread1第一次进入循环,h != null && h != tail 成立,然后开始执行unparkSuccessor方法唤醒thread2,unparkSuccessor方法执行完成之后即代表thread2被唤醒,此时thread1thread2同时在运行,这一时刻,head的状态为0
  2. thread1继续往下执行,虽然此时head.waitStatus ==0,但是变量ws的值是在compareAndSetWaitStatus(h, Node.SIGNAL, 0)方法执行之前赋值,所以此时ws==SIGNAL,所以此时else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))条件不成立,然后判断h == head是否成立 2.1 如果此时thread2还没有获取锁成功,即自己还没有成功升级为head,则h == head成立,此时thread1直接退出,接下来只需要thread2通过自旋获取锁成功就可以了,已经没有thread1什么事了,那有人会问,如果同步器里还可以获取锁,现在只唤醒一个节点B,线程threa1就直接退出了,后面的C、D节点怎么唤醒呢,这个交给setHeadAndPropagate 2.2 如果此时thread2获取锁成功,即自己已经成功升级为head,则h == head不成立,此时thread1进入下一轮循环;同时thread2会执行setHeadAndPropagate方法,如果在同步器有锁得情况下,thread2还会执行doReleaseShared,所以此时有可能两个线程同时执行doReleaseShared
  3. thread1thread2同时执行doReleaseShared方法,因为此时新head是节点B,状态为SIGNALcompareAndSetWaitStatus(h, Node.SIGNAL, 0)只有一个线程能执行成功 3.1 如果thread1执行成功,则会唤醒下一个节点,即节点C,如果节点C在升级head成功之后判断同步器中还有锁,节点C所在线程thread3会继续唤醒下一个节点,所以此时可能有3个线程在同时执行doReleaseShared方法 3.2 在thread1执行成功的时候,thread2可能会在新一轮循环退出:在执行Node h = headif (h == head)这两行代码时head未发生变化;也可能不退出:即head发生变化,此时有可能thread1 thread2 thread3 同时在竞争执行compareAndSetWaitStatus(h, Node.SIGNAL, 0) 方法,具体谁能成功,谁也说不准
  4. 在同步器有锁的情况下,如果 A B C D 4个节点都被唤醒了,说明此时队列中只剩下一个head,即节点D。当然,此时可能 A B C D 节点对应的线程可能都在执行doReleaseShared方法,但是没有关系,因为新一轮的循环条件if (h != null && h != tail) && h == head会导致它们退出
  5. 但考虑这种一种情况,加入有3个线程都正常退出了,然后在线程thread4执行if (h != null && h != tail)之前,队列中添加了一个新的节点,即节点E,这时候thread4会重新进入循环,这时候head(即D节点)的状态有两种情况 5.1 节点E添加到尾部成功,并且已经修改了其前驱节点(节点D)的状态为SIGNAL,也就是此时head(即D节点)的状态 == SIGNAL, 这时候thread4会唤醒节点E,接下来的流程和上面一样,就不分析了 5.2 节点E添加到尾部成功,但还没来得急修改其前驱节点(节点D)的状态为SIGNAL,也就是此时head(即D节点)的状态 == 0,这时候thread4会进入到else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))判断
  6. ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)什么情况下会不成立呢? 6.1 在多个线程执行doReleaseShared的时候,加入3个线程执行doReleaseShared,第一个线程成功执行了unparkSuccessor方法,那剩下两个线程有可能在新一轮循环中并发执行else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),这种情况我们占不考虑 6.2 即在执行ws == 0!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))这两行代码中间,head节点的状态被改掉了。而我们知道,在添加节点的时候会改变前驱节点的状态为SIGNAL,所以在节点E对应线程自旋设置head状态,可能导致这里不成立

虽然只有几行代码,但在不了解作者意图的情况下,真的好难看懂,做一个总结

  1. 首先要理解共享锁的含义,共享锁代表在同一时刻可以有多个线程获取锁,具体有几个线程由用户自己决定;而独占锁代表同一个时刻只能由一个线程获取锁
  2. 即然同一时刻可以有多个线程获取锁,那在释放锁的时候,怎么尽快的唤醒其它阻塞的节点呢?这就涉及到共享锁的传播
  3. 当一个线程获得锁的时候,会调用setHeadAndPropagate方法,如果此时同步器中还有可用的锁,则会调用doReleaseShared方法唤醒下一个节点,这就是传播
  4. 当一个线程释放锁的时候,会调用doReleaseShared方法,该方法会唤醒head节点的下一个节点,而唤醒的节点在通过自旋获得锁后,会调用setHeadAndPropagate方法,如果此时同步器中还有可用的锁,则会继续调用doReleaseShared方法唤醒下一个节点
  5. 有点相互调用的感觉,都是为了在同步器中还有可用的锁,让阻塞的线程尽快获取锁
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用示例
  • CountDownLatch
  • CountDownLatch#await
    • AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
      • AbstractQueuedSynchronizer#setHeadAndPropagate
      • CountDownLatch#countDown
        • AbstractQueuedSynchronizer#doReleaseShared
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档