前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >锁的等待与唤醒 -- ConditionObject 源码解析

锁的等待与唤醒 -- ConditionObject 源码解析

作者头像
用户3147702
发布2022-06-27 12:43:42
3360
发布2022-06-27 12:43:42
举报
文章被收录于专栏:小脑斧科技博客

1. 概述

此前在介绍 ReentrantLock 用法时,我们介绍了 ReentrantLock 与 Condition 的用法,类似于 Object 提供的 notify、notifyAll 方法来让线程进入等待与唤醒,那么这是如何实现的呢? 在介绍 AQS 源码时,我们提到,AQS 维护了两个队列 — 同步队列和等待队列,到现在为止,我们仅仅使用了 AQS 的同步队列,却从没有使用过 AQS 的等待队列,那么 AQS 等待队列究竟是如何实现的呢? 本篇日志我们就来介绍一下 ReentrantLock 的 Condition 的实现,他就是通过 AQS 的等待队列来实现的。

AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁

2. Condition 的创建 — newCondition

代码语言:javascript
复制
public Condition newCondition() {
    return sync.newCondition();
}

可以看到,他直接调用了内部类 Sync 的 newCondition 方法。 那我们来看看 Sync 的 newCondition 方法又做了什么呢?

代码语言:javascript
复制
final ConditionObject newCondition() {
    return new ConditionObject();
}

他直接新建了一个 AbstractQueuedSynchronizer 类的内部类 ConditionObject。

3. ConditionObject

ConditionObject 中维护了一个双线链表,也就是上面提到的 AQS 等待队列,有两个成员分别指向双向链表的首尾。

代码语言:javascript
复制
private transient Node firstWaiter;
private transient Node lastWaiter;

而 Node 对象就是我们熟悉的 AQS 同步队列节点:

代码语言:javascript
复制
static final class Node {
    //标志Node的状态:独占状态。
    static final Node SHARED = new Node();
    //共享状态
    static final Node EXCLUSIVE = null;

    //因为超时或者中断,node会被设置成取消状态,被取消的节点时不会参与到竞争中的,会一直保持取消状态不会转变为其他状态;
    //同步队列中使用
    static final int CANCELLED =  1;
    //该节点的后继节点被阻塞,当前节点释放锁或者取消的时候(cancelAcquire)需要唤醒后继者。
    //同步队列中使用
    static final int SIGNAL    = -1;
    //CONDITION队列中的状态,同步队列中节点没有该状态,当将一个node从CONDITION队列中transfer到同步队列中时,状态由CONDITION转换成0
    //同步队列不使用,CONDITION队列中使用
    static final int CONDITION = -2;
    //该状态表示下一次节点如果是Shared的,则无条件获取锁。
    //同步队列中使用
    static final int PROPAGATE = -3;

    //当一个新的node在同步队列中被创建时初始化为0,在CONDITION队列中创建时被初始化为CONDITION状态
    volatile int waitStatus;

    //队列中的前驱节点
    //CONDITION队列中使用
    volatile Node prev;

    //同步队列中使用,指向下一个节点的引用
    volatile Node next;

    //当前线程
    volatile Thread thread;

    //CONDITION队列中指向下一个node的指针,同步队列中不使用
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //返回前驱节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

4. 出让锁所有权,等待 — await

此前我们已经介绍过,在线程获取锁以后,通过 Condition 对象的 await 方法可以让线程挂起,并暂时释放锁,直到其他线程调用该 Condition 对象的 signal 方法或 signalAll 方法或线程被中断。

代码语言:javascript
复制
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

下面是整个过程的流程图:

通过流程图我们可以看到,整个流程是非常清晰的,简单的来说就是将线程对应的 AQS Node 节点从 AQS 的同步队列中移动到 Condition 对象维护的等待队列的队尾,然后释放锁,挂起线程。 但是,这里存在一个问题,当线程被唤醒的时候,为什么检测到线程被中断不直接抛出 InterruptedException 而是要等到获取锁成功之后呢?因为 await 方法的原则是,怎么进来的怎么出去,他只是让线程休眠一段时间,他不会因为线程被中断就让本来持有锁的线程在退出方法时没有持有锁,这样的话,用户去释放锁的时候就会抛出意外的异常 java.lang.IllegalMonitorStateException,这显然是不合理的。

5. 带有超时时间的 await

这三个方法与 await 方法做了相同的事情,那就是让出锁的所有权,进入等待,但是他们的独特之处在于,你可以定义让出锁所有权的最长等待时间。 他们三个方法的源码非常相似,与不带参数的 await 方法的区别仅仅是使用 LockSupport.parkNanos(this, nanosTimeout) 来实现线程的挂起,同时,超时的时候强制将 Condition 队列中的 Node 节点放入到 AQS 的同步队列中。

5.1. await(long time, TimeUnit unit)

代码语言:javascript
复制
public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

返回被唤醒时是否已经超时。

5.2. awaitNanos(long nanosTimeout)

代码语言:javascript
复制
public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}

与上面的 await(long time, TimeUnit unit) 方法相比,几乎是一模一样的,仅仅是去掉了 TimeUnit 转纳秒数的部分,同时返回了剩余的纳秒数。

5.3. awaitUntil

代码语言:javascript
复制
public final boolean awaitUntil(Date deadline)
        throws InterruptedException {
    long abstime = deadline.getTime();
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (System.currentTimeMillis() > abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        LockSupport.parkUntil(this, abstime);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

6. 最简单的 await — awaitUninterruptibly

awaitUninterruptibly 是最简单的 await 方法了,他没有对 interrupted 方法的返回结果做任何处理。

代码语言:javascript
复制
public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

7. 唤醒沉睡中的节点 — signal

代码语言:javascript
复制
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

signal 的流程比较简单,主要就做了下面三件事。 1. 权限校验 2. 将节点从 Condition 队列放入 AQS 同步队列 3. 唤醒线程

8. 唤醒所有沉睡中的节点 — signalAll

代码语言:javascript
复制
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

我们可以看到,和 signal 从流程上几乎是一样的,所有节点被逐个添加到 AQS 同步队列尾部。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-08-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. Condition 的创建 — newCondition
  • 3. ConditionObject
  • 4. 出让锁所有权,等待 — await
  • 5. 带有超时时间的 await
    • 5.1. await(long time, TimeUnit unit)
      • 5.2. awaitNanos(long nanosTimeout)
        • 5.3. awaitUntil
        • 6. 最简单的 await — awaitUninterruptibly
        • 7. 唤醒沉睡中的节点 — signal
        • 8. 唤醒所有沉睡中的节点 — signalAll
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档