前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【10张图】管程内部,进去看看

【10张图】管程内部,进去看看

作者头像
jinjunzhu
发布2022-09-23 09:02:08
1810
发布2022-09-23 09:02:08
举报
文章被收录于专栏:个人开发

java对共享变量的操作管理使用了MESA管程模型。下图是Java基于AQS实现的MESA管程模型:

上图中有三个知识点:

  • MESA管程模型封装了共享变量和对共享变量的操作,线程要进入管程内部,必须获取到锁,如果获取锁失败就进入入口等待队列阻塞等待。
  • 如果线程获取到锁,就进入到管程内部。但是进入到管程内部,也不一定能立刻操作共享变量,而是要看条件变量是否满足,如果不满足,只能进入条件变量等待队列阻塞等待。
  • 在条件变量等待队列中,如果被其他线程唤醒,也不一定能立刻操作共享变量,而是需要去入口等待队列重新排队等待获取锁。

本文主要讲解管程模型中条件变量等待队列。

1 官方示例

首先我们看一下官方给出的示例代码:

代码语言:javascript
复制
public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

这个代码定义了两个条件变量,notFull和notEmpty,说明如下:

  1. 如果items数组已经满了,则notFull变量不满足,线程需要进入notFull条件等待队列进行等待。当take方法取走一个数组元素时,notFull条件满足了,唤醒notFull条件等待队列中等待线程。
  2. 如果items数组为空,则notEmpty变量不满足,线程需要进入notEmpty条件等待队列进行等待。当put方法加入一个数组元素时,notEmpty条件满足了,唤醒notEmpty条件等待队列中等待线程。
  3. 条件变量是绑定在Lock上的,示例代码使用了ReentrantLock。在执行await和signal方法时首先要获取到锁。

2 原理简介

Java AQS的条件变量等待队列是基于接口Condition和ConditionObject来实现的,URM类图如下:

Condition接口主要定义了下面3个方法:

  • await:进入条件等待队列
  • signal:唤醒条件等待队列中的元素
  • signalAll:唤醒条件等待队列中的所有元素

3 await

条件等待队列跟入口等待队列有两个不同:

  • 虽然二者共用了Node类,但是条件等待队列是单向队列,入口等待队列是双向队列,条件队列中下一个节点的引用是nextWaiter,入口等待队列中下一个节点的引用是next。
  • 条件等待队列中元素的waitStatus必须是-2。

await方法的流程如下图:

3.1 进入条件等待队列

入队方法对应方法addConditionWaiter,这里有三种情况:

  • 队列为空,则新建一个节点,如下图:
  • 队列非空,最后一个元素的waitStatus是-2,如下图:
  • 队列非空,最后一个元素的waitStatus不是-2,如下图:

可以看到,这种情况会从队列第一个元素开始检查waitStatus不是-2的元素,并从队列中移除。

3.2 释放锁

AQS的并发锁是基于state变量实现的,线程进入条件等待队列后,要释放锁,即state会变为0,释放操作会唤醒入口等待队列中的线程。对应方法fullyRelease,返回值是释放锁减掉的state值savedState。

3.3 阻塞等待

释放锁后,线程阻塞,自旋等待被唤醒。

3.4 唤醒之后

唤醒之后,当前线程主要有四个动作:

  • 转入入口等待队列,并把waitStatus改为0。

waitStatus等于0表示中间状态,当前节点后面的节点已经唤醒,但是当前节点线程还没有执行完成。

  • 重新获取锁,如果获取成功,则当前线程成为入口等待队列头结点,interruptMode置为1。
  • 如果当前节点在条件等待队列中有后继节点,则剔除条件等待队列中waitStatus!=-2的节点,即队列中状态为取消的节点。
  • interruptMode如果不等于0,则处理中断。

3.5 一个细节

上面提到了interruptMode,这个属性有三个值:

  • 0:没有被中断
  • -1:中断后抛出InterruptedException,这种情况是当前线程阻塞,没有被signal之前发生了中断
  • 1:重新进入中断状态,这种情况是指当前线程阻塞,被signal之后发生了中断

3.6 扩展

AQS还提供了其他几个await方法,如下:

  • awaitUninterruptibly:不用处理中断。
  • awaitNanos:自旋等待唤醒过程中有超时时间限制,超时则转入入口等待队列。
  • awaitUntil:自旋等待唤醒过程中有截止时间,时间到则转入入口等待队列。

4 signal

唤醒条件等待队列中的元素,首先判断当前线程是否持有独占锁,如果没有,抛出异常。

唤醒条件队列中的元素,会从第一个元素也就是firstWaiter开始,根据firstWaiter的waitStatus是不是-2,分两种情况。

4.1 waitStatus==-2

条件队列第一个节点进入入口等待队列,等待获取锁,如下图:

这里有两个注意点:

  • 如果入口等待队列中tail节点的waitStatus小于等于0,则firstWaiter加入后需要把旧tail节点置为-1(表示后面节点等待当前节点唤醒),如下图:

如果重置waitStatus状态失败,则unpark节点firstWaiter。

  • 如果入口等待队列中tail节点的waitStatus大于0,则unpark节点firstWaiter。

4.2 waitStatus!=-2

如果firstWaiter的waitStatus不等于-2,则查找firstWaiter的nextWaiter,直到找到一个waitStatus等于-2的节点,然后将这个节点加入入口等待队列队尾,如下图:

4.3 waitStatus修改

上面的两种情况无论哪种,进入入口等待队列之前都要用CAS的方式把waitStatus改为0。

5 signalAll

理解了signal的逻辑,signalAll的逻辑就非常容易理解了。首先判断当前线程是否持有独占锁,如果没有,抛出异常。

将条件等待队列中的所有节点依次加入入口等待队列。如下图:

6 使用案例

6.1 示例代码

java并发包下有很多类使用到了AQS中的Condition,如下图:

这里我们以CyclicBarrier为例来讲解。CyclicBarrier是让一组线程相互等待共同达到一个屏障点。从Cyclic可以看出Barrier可以循环利用,也就是当线程释放之后可以继续使用。

看下面这段示例代码:

代码语言:javascript
复制
public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
        System.out.println("栅栏中的线程执行完成");
    });
    ExecutorService executorService = Executors.newFixedThreadPool(2);

    executorService.submit(() -> {
        try {
            System.out.println("线程1:" + Thread.currentThread().getName());
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    });

    executorService.submit(() -> {
        try {
            System.out.println("线程2:" + Thread.currentThread().getName());
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    });

    executorService.shutdown();
}

执行结果:

代码语言:javascript
复制
线程1:pool-1-thread-1
线程2:pool-1-thread-2
栅栏中的线程执行完成

6.2 原理讲解

CyclicBarrier初始化的时候,会指定线程的数量count,每个线程执行完逻辑后,调用CyclicBarrier的await方法,这个方法首先将count减1,然后调用Condition的await,让当前线程进入条件等待队列。当最后一个线程将count减1后,count数量等于0,这时就会调用Condition的signalAll方法唤醒所有线程。

7 总结

java的管程模型使用了MESA模型,基于AQS实现的MESA模型中,使用双向队列实现了入口等待队列,使用变量state实现了并发锁,使用Condition实现了条件等待队列。

在AQS的实现中,使用同步队列这个术语来表示双向队列,本文中使用入口等待队列来描述是为了更好的配合管程模型来讲解

AQS的Condition中,使用await方法将当前线程放入条件等待队列阻塞等待,使用notify来唤醒条件等待队列中的线程,被唤醒之后,线程并不能立刻执行,而是进入入口等待队列等待获取锁。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-06-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 君哥聊技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 官方示例
  • 2 原理简介
  • 3 await
    • 3.1 进入条件等待队列
      • 3.2 释放锁
        • 3.3 阻塞等待
          • 3.4 唤醒之后
            • 3.5 一个细节
              • 3.6 扩展
              • 4 signal
                • 4.1 waitStatus==-2
                  • 4.2 waitStatus!=-2
                    • 4.3 waitStatus修改
                    • 5 signalAll
                    • 6 使用案例
                      • 6.1 示例代码
                        • 6.2 原理讲解
                        • 7 总结
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档