前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >三万文字搞定基础面试:Java 并发包(JUC)及应用场景

三万文字搞定基础面试:Java 并发包(JUC)及应用场景

作者头像
35岁程序员那些事
发布2022-09-23 17:24:21
6310
发布2022-09-23 17:24:21
举报

0 目录

工作做螺丝钉,面试造火箭,我想这个是每个程序员比较头疼的事情,但是又有必须经历的流程,尤其是 JDK 基础。本文会从系统思维的角度,将 JUC 的高频面试题,从因果关系层次串联起来(思维导图),便于加强记忆深度。

本期 Chat 内容如下:JUC 核心——AQS、JUC 核心——线程池、 JUC 核心——原子类、JUC 核心——锁、JUC 核心在 Java 容器中的应用和JUC 核心在 Dubbo、RocketMQ、Nacos 等 RPC 框架中的应用。

1 JUC核心——AQS

我写文章比较喜欢脑图,这样才能从系统思维的角度去分析问题的本质,从唯物辨证论的角度,存在即合理。

AQS 核心主要包括:

  • AbstractOwnableSynchronizer;
  • AbstractQueuedLongSynchronizer;
  • AbstractQueuedSynchronizer。

通过分析,我们可以很快 get 到如下信息:

  • 通常所说的 AQS 是指 AbstractQueuedSynchronizer;
  • AbstractQueuedLongSynchronizer 是 AbstractQueuedSynchronizer 同级的变种,功能一致,只是将同步状态从 volatile int state 变更为 volatile long state,增强 AQS 语义,支持 64 位长度的状态机。

1 .1 AbstractQueuedSynchronizer类

从系统思维角度看 AQS 语义如图所示。

从思维导图分析得出如下结论:

  • AQS 核心是 Node 和 ConditionObject;
  • AQS 又是 ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor、Samaphore 以及 CountDownLatch 的核心。

01

Node类

Node 整体系统思维导图如图所示。

Node 状态机:

  • CANCELLED(标注当前线程的等待状态为已经取消)
  • SIGNAL(标注 CLH Queue 中的线程的状态为 unparking)
  • CONDITION(标注线程为条件等待状态)
  • PROPAGATE(标注下一次线程的等待状态必须是共享语义)

Node 总结

Node 本质上是一个链表的基础数据结构,具备非常丰富的语义,基于 Node,AQS 构建了 CLH Queue,并基于这个队列服务于多线程语义环境下的等待线程的业务场景。

02

ConditionObject类

ConditionObject 系统思维导图如图所示。

AQS 封装了条件对象,条件对象 ConditionObject 继承 java.util.concurrent.locks.Condition。

AQS 实现条件变量语义,并作为锁的功能底层基础模板能力,条件对象只是定义了锁和条件用户的基础语义及方法,具体的扩展语义需要使用者自己实现,具体的扩展语义需要借助 AQS 类来扩展实现。

ConditionObject 定义了两个条件队列节点 firstWaiter 和 lastWaiter,并且都是 transient 的。关于 transient 关键字的语义原理就不在这里展开,大致语义就是不会被序列化。条件对象ConditionObject 的具体语义如下:

  • firstWaiter,也叫First node of condition queue,可以理解为头节点,头节点是一个阻塞的线程节点。
  • lastWaiter,也叫Last node of condition queue,可以理解为尾结点。

ConditionObject 总结

  • Condition(java.util.concurrent.locks.Condition)分解 Object monitor 方法(wait()、notify()、notifyAll())进去不同的对象,通过配合 Lock 的实现使用,达到每个对象有个 wait-set 的效果。
  • Lock 可用于替换 synchronized 关键字的使用,Condition 可用于替换 Object monitor 方法的使用。
  • 多个 Condition 提供一种手段使得一个线程可以挂起, 直到某些状态条件成立时被另一个线程唤醒。**由于这些状态条件可以被不同线程访问,它们必须被保护,因此锁会以某种形式关联这些状态条件**。在使用 Condition 前,必须先使用 Lock 获取锁,获得锁后,如果 Condition 不满足,则调用 Condition.wait() 方法等待,该方法会先释放获得的锁,然后挂起当前线程,直到 condtion 满足被通知唤醒,Condition.wait() 方法就像 Object.wati() 方法。
  • Condition 实例会被绑定到一个 Lock 实例上,Condition 实例只能通过 Lock.newCondition() 方法获取。
  • ConditionObject 实现了 Condition 接口,是 AQS 中的内部类,也让 AQS 具备了通过条件变量处理并发问题的能力。

03

AbstractQueuedSynchronizer 总结

从系统思维的角度讲,AbstractQueuedSynchronizer 是从顶层设计 SDK 模式的并发语义,并形成一套语义模板,业务可以灵活的基于模板去做扩展。

1 .2 AbstractQueuedSynchronizer类

AbstractQueuedLongSynchronizer 系统思维导图:

1 .3 AbstractOwnableSynchronizer类

AbstractOwnableSynchronizer 只是一个抽象类里面抽象了一个排他线程 Thread exclusiveOwnerThread,但是并没有具体实现,需要子类去自定义。

2 JUC核心——锁

锁是JUC中非常核心的功能,那么她与AQS有什么关系呢?

2.1 ReentrantLock(可重入锁)

ReentrantLock 整体系统思维导图如下:

分析可重入锁如下场景语义:

  • lock()
  • unlock()
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//处理业务逻辑
}finally {
   mainLock.unlock();
}

lock 语义总结如下:

  • ReentrantLock 默认初始化为非公平锁,非公平锁会初始化NonfairSync,公平锁会初始化 FairSync。
  • mainLock.lock() 会执行 sync.lock(),非公平锁就是 “new NonfairSync().lock()“,公平锁就是 “new FairSync().lock()“。

new NonfairSync().lock() 语义:

  • 满足如下条件:基于 unsafe.compareAndSwapInt 语义比较 AQS 的同步状态 state,期望值是 0,更新为 1,更新成功,在 AQS 中设置互斥持有锁的线程变量 exclusiveOwnerThread 为当前线程。
  • 不满足条件:说明已经有线程将同步状态 state 赋值为非 0,执行 acquire(1) 语义。

acquire(1) 语义:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
       acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     selfInterrupt();
 }

第一步:acquire(1) 语义处于 AQS 中

第二步:tryAcquire(arg) 语义处于 NonfairSync 和 FairSync 中

第三步:new NonfairSync().tryAcquire(1) 语义会调用 new Sync().nonfairTryAcquire(1)

第四步:new Sync().nonfairTryAcquire(1) 语义非常复杂

final boolean nonfairTryAcquire(int acquires) {
   //获取当前线程
  final Thread current = Thread.currentThread();
  //获取当前 AQS 的同步状态 state
  int c = getState();
  //如果 c==0,表示资源已经释放-unlock
  if (c == 0) {
  //执行 AQS 中的 compareAndSetState,将 state 赋值为非 0,也就是 1
       if (compareAndSetState(0, acquires)) {
       //如果成功
       //在 AQS 中设置互斥持有锁的线程变量 exclusiveOwnerThread 为当前线程
      //直接返回                       
          setExclusiveOwnerThread(current);
           return true;                   
       }
  }
  //如果 c!=0 并且资源被当前线程持有,就累加 state 值,返回 true
  else if (current == getExclusiveOwnerThread()) {
       int nextc = c + acquires;
       if (nextc < 0) // overflow
           throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
    }
    return false;
}

第五步:addWaiter(Node.EXCLUSIVE) 语义,入参为 Node.EXCLUSIVE 一个 null 指针,标记添加的节点处于互斥模式。

private Node addWaiter(Node mode) {
            //基于当前线程构造一个节点,nextWaiter=mode
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            //将 node 节点添加在链表中
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //尾结点为空,说明当前队列还没有构建,需要重新构建队列,并插入新的节点
            enq(node);
            return node;
        }
    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

第六步:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 语义,使用条件等待去获取当前已经入队列的线程使用权。

final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    //返回 node 的前置节点
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

第七步:AQS 中的 selfInterrupt() 语义,Thread.currentThread().interrupt(),表示要 interrupt 阻塞当前线程。

2.2 ReentrantReadWriteLock(可重读写入锁)

ReentrantReadWriteLock(可重读写入锁),如下图所示。

ReentrantReadWriteLock 从名字中就可以看出,JVM 语义和 ReentrantLock 差不多,只是加了读写分离的这么一个语义。

从 JDK 类的层次结构可以判断,ReentrantReadWriteLock 继承了 ReentrantLock,增加了内部类 WriteLock 和 ReadLock,完成读写的语义,通过 NonfairSync 和 FairSync 完成锁的公平和非公平语义。通过内部类 ThreadLocalCounter 及 HoldCounter 完成 Sync 的读写线程计数的简单语义。

2.3 StampedLock类

StampedLock 整体思维导图如下图。

StampedLock 是 JDK 1.8 引入的一个读写锁,比 ReentrantReadWriteLock 具有更强的读写语义,并且使用更加简单。StampedLock 控制锁有三种模式,一个 StampedLock 状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据 stamp,它用相应的锁状态表示并控制访问,数字 0 表示没有写锁被授权访问。在读锁上分为悲观锁和乐观锁。

StampedLock 整体线程安全和线程通信是不依赖 AQS,自己又重新定义了一套语义 API。

StampedLock 使用注意事项。

对于读多写少的场景 StampedLock 性能很好,简单的应用场景基本上可以替代 ReadWriteLock,但是 StampedLock 的功能仅仅是 ReadWriteLock 的子集,在使用的时候,还是有几个地方需要注意一下。StampedLock 在命名上并没有增加 Reentrant,想必你已经猜测到 StampedLock 应该是不可重入的。

事实上,的确是这样的,StampedLock 不支持重入。这个是在使用中必须要特别注意的。另外,StampedLock 的悲观读锁、写锁都不支持条件变量,这个也需要你注意。

还有一点需要特别注意,那就是:如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。

StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。注意这里,我们用的是“乐观读”这个词,而不是“乐观读锁”,是要提醒你,乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些。

2.4 LockSupport类

‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍

LockSupport 是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,当然阻塞之后肯定得有唤醒的方法。

  • park 和 unpark 可以实现类似 wait 和 notify 的功能,但是并不和 wait 和 notify 交叉,也就是说 unpark 不会对 wait 起作用,notify 也不会对 park 起作用。
  • park 和 unpark 的使用不会出现死锁的情况。
  • blocker 的作用是在 dump 线程的时候看到阻塞对象的信息。

3 JUC核心——原子类

JUC 源码包中所有的原子类,我这边整理了一下,总共有如下类,但是有很多我们是不怎么会经常用的,也算是一次扫盲:

  • AtomicBoolean
  • AtomicInteger
  • AtomicIntegerArray
  • AtomicIntegerFieldUpdater
  • AtomicLong
  • AtomicLongArray
  • AtomicLongFieldUpdater
  • AtomicMarkableReference
  • AtomicReference
  • AtomicReferenceArray
  • AtomicReferenceFieldUpdater
  • AtomicStampedReference
  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder
  • Striped64

3.1 AtomicBoolean

AtomicBoolean 是 JUC 包中经常被使用的原子包,归纳如下:

  • AtomicBoolean 通过一个 volatile 的整型变量 value 和基于内存语义的 “static final long valueOffset” 完成原子语义;
  • valueOffset 通过 unsafe.objectFieldOffset,操纵变量的内存偏移地址;
  • 通过 volatile 保证可见性和时序性,通过 unsafe 保证内存语义的原子性,整个过程无锁。

3.2 AtomicInteger

AtomicInteger 也是 JUC 包中经常被使用的原子包,归纳如下:

  • AtomicInteger 封装了 java.lang.Number 类,并重写了 Number 里面的非原子方法;
  • AtomicInteger 通过一个 volatile 的整型变量 value 和基于内存语义的 “static final long valueOffset“ 完成原子语义;
  • valueOffset 通过 unsafe.objectFieldOffset,操纵变量的内存偏移地址;
  • 自增和自减通过 unsafe.getAndAddInt 1 或者 -1 完成语义;
  • JDK 1.8 新增语义方法 getAndUpdate(IntUnaryOperator updateFunction)、updateAndGet(IntUnaryOperator updateFunction)、getAndAccumulate(int x,IntBinaryOperator accumulatorFunction) 和 accumulateAndGet(int x,IntBinaryOperator accumulatorFunction),主要是为了增加 AtomicInteger 的函数式编程语义,将函数作为参数传入。

3.3 AtomicLong

AtomicLong 也是 JUC 包中经常被使用的原子包,归纳如下:

  • AtomicLong 封装了 java.lang.Number 类,并重写了 Number 里面的非原子方法;
  • AtomicLong 通过一个 volatile 的整型变量 value 和基于内存语义的 static final long valueOffset 完成原子语义;
  • 新增 “VM_SUPPORTS_LONG_CAS = VMSupportsCS8()”用作区分当前虚拟机是否支持 ”Unsafe.compareAndSwapLong”;
  • JDK 1.8 新增语义方法 getAndUpdate(LongUnaryOperator updateFunction)、updateAndGet(LongUnaryOperator updateFunction)、getAndAccumulate(long x,LongBinaryOperator accumulatorFunction)和 accumulateAndGet(long x,LongBinaryOperator accumulatorFunction)。

3.4 AtomicReference

AtomicReference 这个原子类应该也是我们平时开发中经常要使用的,归纳如下:

  • AtomicReference 通过一个 volatile 的整型变量 value 和基于内存语义的 “static final long valueOffset“ 完成原子语义;
  • AtomicReference 采用泛型编程,需要传入泛型变量 V;
  • 主要是通过 unsafe.compareAndSwapObject(this, valueOffset, expect, update) 完成内存地址偏移的原子语义;
  • 为了匹配 JDK 1.8 函数式编程,也是新增语义 getAndUpdate(UnaryOperator&lt;V> updateFunction)、updateAndGet(UnaryOperator&lt;V> updateFunction)、getAndAccumulate(V x,BinaryOperator&lt;V> accumulatorFunction)、accumulateAndGet(V x,BinaryOperator&lt;V> accumulatorFunction)。

3.5 AtomicStampedReference

AtomicStampedReference 这个原子类,平常开发中使用的应该不多,归纳如下:

  • AtomicStampedReference 通过一个静态内部类 Pair,来持有当前 Reference 和 Stamp 的一个映射关系,从而能够保持原子更新;
  • 对比 AtomicReference,AtomicStampedReference 性能要比 AtomicReference 高。

3 .6 LongAdder

LongAdder 类是 JDK 1.8 新增的一个内存计数器原子类,性能极高,底层继承了 Striped64 类。

Striped64 是在 Java 8 中添加用来支持累加器的并发组件,它可以在并发环境下使用来做某种计数,Striped64 的设计思路是在竞争激烈的时候尽量分散竞争,在实现上,Striped64 维护了一个 base Count 和一个 Cell 数组,计数线程会首先试图更新 base 变量,如果成功则退出计数,否则会认为当前竞争是很激烈的,那么就会通过 Cell 数组来分散计数,Striped64 根据线程来计算哈希,然后将不同的线程分散到不同的 Cell 数组的 index 上,然后这个线程的计数内容就会保存在该 Cell 的位置上面,基于这种设计,最后的总计数需要结合 base 以及散落在 Cell 数组中的计数内容,Striped64 设计的核心是分段锁算法。

LongAdder 实现借助 Striped64,LongAdder.add 首先判断 cells 是否为 null,如果为 null,则会尝试将本次计数累计到 base 上,如果 cells 不为 null,或者操作 base 失败,那么就会通过哈希值来获取当前线程对应的 cells 数组中的位置,获取该位置上的 cell,如果该 cell 不为 null,那么就试图将本次计数累计到该 cell 上,如果不成功,那么就需要借助 Striped64 类的 longAccumulate 方法来进行计数累计。

3 .7 总结

总结如下:

  • 高并发写的场景下,建议最好使用 LongAdder 或者 Striped64 做单机内存计数器;
  • LongAdder 和 Striped64 整体设计思想和 ConcurrentHashMap 类似。

4 JUC 核心——线程池

线程池 ThreadPoolExecutor,系统思维导图:

4 .1 线程池描述

线程池这个话题就大了,我不敢保证会整理的让大家满意,但是也是自己一行行看代码分析出来的,如有问题欢迎拍砖。

首先打开 JUC 包,我们就会看到带 Thread 命名标签的类,比如ThreadPoolExecutor、ThreadFactory、ScheduledThreadPoolExecutor、ThreadLocalRandom。

ThreadPoolExecutor 类图结构如下:

ScheduledThreadPoolExecutor 类结构如下:

说到这里,其实还有几个比较熟悉的类Executors、ExecutorService、Executor,在面试的过程中,很多比较扣细节的面试官,会让你回答 Executors 和 Executor 的区别,以及 ExecutorService 又是做什么的,主要是考察候选人有没有读过 JUC 的源码,当然这样是很变态的。

  • Executors:对 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 以及 Callable 的封装,是一个工厂类;
  • Executor:是一个接口,封装了 execute(Runnable command) 方法,执行维度为 Runnable 类;
  • ExecutorService:是一个接口,ExecutorService 继承 Executor,并增强了并发的语义,增加了很多方法,比如 submit、invokeAll、invokeAny 等方法;
  • ScheduledExecutorService:是一个接口,ScheduledExecutorService 继承 ExecutorService,增强了并发语义,增加了与定时相关的并发方法,比如:schedule、scheduleAtFixedRate、scheduleWithFixedDelay;
  • ThreadFactory:是一个接口,封装了 Thread newThread(Runnable r) 语义,通过 Runnable 来新建线程;
  • Callable:一个函数接口,封装了 `V call() throws Exception` 语义,返回任务的结果。

Executors 类图结构如下:

其实从类图结构,以及这些与线程池相关的基础类的名称就可以大致推断,类整体发展的一个生命周期,例如:ExecutorService 肯定出现在 ScheduledExecutorService 之前,Executor 接口主要是为了保证单一性原则。

4 .2 基础类 Thread

Thread 系统思维导图:

java.lang.Thread 类功能非常强大,搞懂这个类,线程池原理基本搞懂一半,但是并不是所有人都完整的看完过这个基础类的代码。

Thread 就是程序中一个线程的执行.JVM 允许一个应用中多个线程并发执行。每个线程都有优先级,高优先级线程优先于低优先级线程执行,每个线程都可以(不可以)被标记为守护线程。当线程中的 run() 方法代码里面又创建了一个新的线程对象时,新创建的线程优先级和父线程优先级一样,当且仅当父线程为守护线程时,新创建的线程才会是守护线程。

当 JVM 启动时,通常会有唯一的一个非守护线程(这一线程用于调用指定类的 main() 方法),JVM 会持续执行线程直到下面某一个情况发生为止:

  • 类运行时 exit() 方法被调用且安全机制允许此 exit() 方法的调用;
  • 所有非守护类型的线程均已经终止,或者 run() 方法调用返回或者在 run() 方法外部抛出了一些可传播性的异常。

java.lang.Thread 实现接口 java.lang.Runnable 我想这个大家都知道,并且 Runnable 是一个函数接口,标注了@FunctionalInterface,我想这个是为了兼容函数式编程,做的改造,Runnable 里面就一个抽象方法,“void run()”。

线程状态机:

  • State.NEW(初始化状态)
  • State.RUNNABLE(可运行/运行状态)
  • State.BLOCKED(阻塞状态)
  • State.WAITING(无时限等待)
  • State.TIMED_WAITING(有时限等待)
  • State.TERMINATED(终止状态)

这看上去挺复杂的,状态类型也比较多。但其实在操作系统层面,Java 线程中的 BLOCKED、WAITING、TIMED_WAITING 是一种状态,即前面我们提到的休眠状态。也就是说只要 Java 线程处于这三种状态之一,那么这个线程就永远没有 CPU 的使用权。

关键方法分析

1. public static native Thread currentThread()

通常我们会这样用,Thread.currentThread(),返回当前 CPU 正在执行状态中的线程,其实从这个方法的语义中可以看出,JAVA 原生 native 方法只允许只有一个线程处于执行状态,这就是为什么在 I/O 密集型任务中,必须要用多线程来合理利用多核 CPU 的优势,因为 Java 原生就只允许一个线程处于执行状态,当一个线程处于 I/O 阻塞时,就必须要复用线程的执行状态,让其他的线程任务去执行,避免资源浪费。

/**
    * Returns a reference to the currently executing thread object.
    *
    * @return  the currently executing thread.
   */

2. private static native void registerNatives()

Thread 类中初始化操作系统本地 C 函数,并引入到当前 Java 类中,这样 Thread 中所有与线程相关的native 都可以使用。比如 stop0、setPriority0、resume0、interrupt0、suspend0、start0()。

/* Make sure registerNatives is the first thing <clinit> does. */
   private static native void registerNatives();
       static {
           registerNatives();
       }

3. UncaughtExceptionHandler

未捕获异常处理器,在线程由于未捕获的异常终止时,JVM 会进行一些处理,处理流程如下:

  • JVM 调用终止线程的 getUncaughtExceptionHandler 方法获取终止线程的 uncaughtExceptionHandler;
  • 非 null 则调用 uncaughtExceptionHandler 的 uncaughtException 方法,同时将此终止线程和其异常作为参数传入;
  • null 则找到终止线程所在的最上级线程组,调用其 uncaughtException 方法,同时将此终止线程和其异常作为参数传入;
  • 调用 Thread.getDefaultUncaughtExceptionHandler 获取 handle,非空则调用其 uncaughtException 方法,空则判断调用 e.printStackTrace(System.err) 处理。
@FunctionalInterface
       public interface UncaughtExceptionHandler {
           /**
            * Method invoked when the given thread terminates due to the
            * given uncaught exception.
            * <p>Any exception thrown by this method will be ignored by the
            * Java Virtual Machine.
            * @param t the thread
            * @param e the exception
            */
           void uncaughtException(Thread t, Throwable e);
       

4. Caches/WeakClassKey

我们可以看到 WeakClassKey 这个内部类继承了 WeakReference,而WeakClassKey 被 Caches 所使用,从名字我们也能明白其部分含义,本地缓存,WeakClassKey 是弱引用相关类。

subclassAudits 提供了一个哈希表缓存,该缓存的键类型为 java.lang.Thread.WeakClassKey,注意看它的值类型是一个 java.lang.Boolean 类型的,从其代码注释可以知道这个哈希表缓存中保存的是所有子类的代码执行安全性检测结果。

5. 线程初始化init()

Thread 初始化做了哪些事情:

  • 获取父线程,其实就是获取当前正在执行的活跃线程。
  • 通过 System.getSecurityManager() 获取当前 JVM 中初始化的 SecurityManager,熟悉 JDK 底层的都知道,Java 很多 class 类,业务应用开发者是不能调用了,那么权限就是通过 SecurityManager 来控制的。
  • 如果线程组为空,也就是当前虚拟机启动的过程中,当前需要初始化的线程为第一个,会从 SecurityManager 中取出线程组,否则会直接取父线程的线程组。
  • check 当前类的重载 JDK 底层类的权限。
  • check 线程组的权限。
  • 线程组没有启动的线程数自增,连 JDK 底层都在使用 synchronized 保持线程安全,说明 synchronized 性能已经很高了。
  • 初始化 daemon 标志,和父线程保持一致。
  • 初始化 priority,和父线程保质一致。
  • 初始化类加载器。
  • 初始化线程的堆栈信息。
  • 设置线程 ID。

线程的初始化,JVM 语义的角度,是串联了父线程及线程组的关系,这也就是我们在做代码调试的时候,经常会有专门的线程树,例如如下:

4 .3 基于 Thread 的一些热点问题

线程高频问题,这里就不展开,就是点到为止。

1. 新建线程

通过 new 关键字创建一个 Thread 对象,并调用 start() 方法,并通知线程组,线程准备启动,并调用本地方法 start0,启动操作系统级别的一个线程资源。那么 run() 又做了什么,它只是执行了一遍 target.run(),串行的执行了一遍 run 方法中的逻辑,并没有在操作系统中新开启一个线程资源,并绑定到当前的线程树上。

2. 终止线程

终止一个线程,可以使用以下几种方法:

  • 让线程的 run() 方法执行完,线程自然结束。(这种方法最好)
  • 通过轮询和共享标志位的方法来结束线程,例如 while(flag){},flag 的初始值设为真,当需要结束时,将 flag 的值设为 false。(这种方法也不很好,因为如果 while(flag){} 方法阻塞了,则 flag 会失效)

3. 等待(wait)和通知(notify)

wait、notify 及notifyAll(),位于 Object 类中,这就意味着任何类都可以调用这个三个方法,语义如下:

  • 一个线程调用 obj.wait(),那么它就会进入 obj 对象的等待队列,这个等待队列,可能会有多个线程在排队;
  • 当调用 obj.notify 时,会随机的从队列中选中一个线程去执行,并唤醒,这个是不公平的;
  • obj.notifyAll() 会唤醒这个等待队列中的所有线程,但是只是唤醒,如果唤醒之后还是存在资源共享及互斥,还会入队列并更改为线程为等待状态。

4. 挂起(suspend)和继续执行(resume)线程

suspend() 挂起,并不会释放锁资源,只是当前线程会暂定,挂起的线程的状态还是 Runnable,当然挂起也会调用原生方法 “private native void resume0()“,通常 suspend() 方法会和 resume() 配合使用,就是因为 suspend 和 resume 回导致线程状态机语义会混乱,所以 JDK 已经将这两个方法废弃掉了。

5. 等待线程结束(join)和谦让(yield)

在多线程协作的业务场景中,一个线程的执行需要依赖另外一个或者多个线程执行的结果,这个时候就可以使用 JDK 原生的 join 语义方法,底层的 join 是调用 wait 方法来完成方法语义的。

yield 语义就是要当前线程让出 CPU,同时还可以竞争 CPU,yield 语义调用的是 `public static native void yield()` 方法。

6. 线程通信

线程通信本质是基于内存资源完成通信:

  • 基于文件完成通信
  • 基于锁:StampedLock、ReentrantReadWriteLock、LockSupport
  • 基于 AQS
  • 基于并发容器
  • 基于原子语义
  • 基于 unsafe 语义

通过本篇文章系统分析,其实本质线程通信最本质是 AQS。

7.原子性、可见性和时序性

原子性、可见性和时序性是多线程编程必须要掌握的方法论,如果说面试官问题线程安全问题,你一定要围绕这三个特性展开。

8.happen-before 原则

happen-before 原则,也是线程一个比较重要的方法论体系,这个也需要结合原子性、可见性和时序性展开,这样才能将知识点全部串联起来。

5 JUC 核心在 Java 容器中的应用

Java 并发容器中基本都会使用 JUC 语义,这里只列举常用的一些并发容器。

5.1 并发容器 ConcurrentHashMap

ConcurrentHashMap 具体原理就不在这里 Chat 了,有很多优秀的文章解释得非常清楚了,这里只是会重点说下,如何利用 JUC 核心,完成一些并发的语义。

ConcurrentHashMap 业务场景 :

  • ConcurrentHashMap 里面有一个内部类 TreeBin,TreeBin 继承 Node。
  • TreeBin 语义本质上是为 ConcurrentHashMap 中的树持有一个读、写和等待的锁,并通过锁的状态机保证线程安全。
  • TreeBin 定义了一个 TreeNode&lt;K,V> root,树的 root 节点;定义 volatile Thread waiter,等待线程;lockState,状态机,1 代表写、2 代表等待、4 代表读。
  • 获取写锁去重建一颗树时,如果状态机状态为等待,则调用 LockSupport.park(this),挂起当前线程,完成线程间的通信。
  • ConcurrentHashMap 在调用 get(Object key),及需要完成基于 Hash 的搜索功能,最后调用 find(int h, Object k) 方法,如果匹配是基于树的遍历时,当然也可以匹配是否是线性搜索,如果 waiter 不为空,就 LockSupport.unpark(w),unpark 当前的 waiter 线程,完成线程通信。

5.2 并发容器 LinkedBlockingDeque

LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

相比于其他阻塞队列,LinkedBlockingDeque 多了 addFirst、addLast、peekFirst、peekLast 等方法,以 first 结尾的方法,表示插入、获取获移除双端队列的第一个元素。以 last 结尾的方法,表示插入、获取获移除双端队列的最后一个元素。

LinkedBlockingDeque 业务场景:

LinkedBlockingDeque 就使用了全局独占锁以及由锁继承 AQS 而产生的两个条件变量,来完成条件状态的通知变更,从而完成线程通信的。

5.3 并发容器 CopyOnWriteArrayList

Copy-On-Write 写入时复制这个技术,准确地说应该是一种思想,在很多系统设计上都会用到,今天我们来谈一谈 Java 语言中,JDK 运用这种写入时复制的思想的数据结构/容器,CopyOnWriteArrayList。

CopyOnWriteArrayList 业务场景:

CopyOnWriteArrayList 会使用 ReentrantLock 语义来完成读写分离。

/** The lock protecting all mutators */
    final transient ReentrantLock lock = new ReentrantLock();
public boolean addAll(Collection<? extends E> c) {
        Object[] cs = (c.getClass() == CopyOnWriteArrayList.class) ?
            ((CopyOnWriteArrayList<?>)c).getArray() : c.toArray();
        if (cs.length == 0)
            return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            if (len == 0 && cs.getClass() == Object[].class)
                setArray(cs);
            else {
                Object[] newElements = Arrays.copyOf(elements, len + cs.length);
                System.arraycopy(cs, 0, newElements, len, cs.length);
                setArray(newElements);
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

5.4 并发容器 LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

LinkedTransferQueue 业务场景:

LinkedTransferQueue 会构造 Node 节点,节点中的 Thread waiter 就是采用了LockSupport.unpark(p.waiter),来完成线程之间的通信。

6 JUC 核心在 Dubbo、RocketMQ、Nacos 等 RPC 框架中的应用

Dubbo、RocketMQ 和 Nacos 是我们在业务开发中常用的一些分布式中间件,分析这些成熟的框架里面的 JUC 语义,更能加深我们从理论到实际应用的业务场景,更加能够灵活地使用 JUC 语义。

6.1 Dubbo

1. Dubbo 启动核心类 DubboBootstrap

DubboBootstrap 是 Dubbo 新增的功能,主要是针对元数据配置,@since 2.7.5,主要是实现在 2.7.5+ 版本之上。DubboBootstrap 是 Dubbo 的新特性,如果说在这里直接展开 DubboBootstrap 具体的功能,可能需要再重新开启一次 Chat,只列举下功能特性:

  • 初始化 ApplicationModel
  • 启动配置中心,通过 configManager 刷新配置信息
  • 如果没有启动配置中心,兼容老的服务注册中心的配置模式
  • 加载远程配置到本地内存
  • 校验全局配置
  • 初始化元数据服务
  • 初始化事件监听

DubboBootstrap 业务场景:

  • DubboBootstrap 整体是一次性初始化完成,但是有很多任务需要定时执行,那么 Dubbo 就封装了一个基础线程启动的功能,例如:executeMutually。
  • executeMutually 的语义是定时执行一个线程。
  • 单独看 executeMutually 类,也许会很奇怪,为什么要通过一个锁,来控制一个线程的执行,就算是线程池本身也没控制这么细。
  • 分析 executeMutually 的调用入口 release 方法,发现里面会存在 condition.signalAll() 的语义,也就是说定时唤醒线程,采用 signalAll,不是 signal,这样就有意思了,也就是同一个线程在高并发的流量下,并不能保证自己一定会被唤醒,也就是需要外加一把可重入锁,当自己被唤醒之后,定时执行,再次调用 signalAll,因为存在 lock,也会不发生因为快速切换导致的线程安全问题,其实很多线程安全问题,在竞争不是特别恶劣(也就是流量不是特别大的前提下)是不会发生的,只有触发条件才会发生,但是这个条件又存在不很多确定因素。
private void executeMutually(Runnable runnable) {
        try {
            lock.lock();
            runnable.run();
        } finally {
            lock.unlock();
        }
    }

2. Dubbo 注册核心类 DubboRegistry

DubboRegistry 是 Dubbo 的注册中心核心类,可以说整个 Dubbo 基本都是围绕这个类展开的,这里概述下这个类的基本功能,便于业务场景的展开:

  • 在 Dubbo 启动过程中,会为每一个消费者或者生产者接口的元数据 URL,生成一个 DubboRegistry 对象,并存储在 AbstractRegistryFactory.REGISTRIES 中,REGISTRIES 是一个 HashMap,key 值为 URL 按照一定的规则生成的唯一键,value 就是 DubboRegistry,如果存在就不会初始化。
  • DubboRegistry 又继承 FailbackRegistry 和 AbstractRegistry 等底层注册的核心类。
  • 既然是注册核心类,注册肯定是需要 RPC 通信的,所以 DubboRegistry 在初始化的过程中,会完成 RPC 框架通信的连接,也就是 connect()。
  • 如果已经建立连接,就直接跳过,如果没有就需要初始化。
  • DubboRegistry 对外包装了 registryService 服务,对外提供注册、取消注册、订阅、取消订阅、服务销毁等功能。

DubboRegistry 业务场景

  • 定义 clientLock,可重入锁;
  • 锁的范围是控制建立通信连接(服务消费者、服务提供者和注册中心的连接);
  • 其实整个建立连接的过程,主要是靠 Invoker.isAvailable(),完成连接的复用;
  • 加锁是为了防止整个连接的创建过程中的线程安全。

3. Dubbo 通信核心类 AbstractClient

AbstractClient 是 Dubbo 通信传输层的核心类,也是非常关键的一个类:

  • 维护一个名字叫 DubboClientHandler 的客户端线程池
  • 维护 RPC 连接的语义
  • 维护底层 send 及通道相关的语义

AbstractClient 业务场景

因为要维护连接语义,所以需要保证连接过程的线程安全及满足线程之间的通信:

  • 定义 Lock connectLock 可重入锁。
  • connectLock 会锁住 doConnect 方法。
  • doConnect 方法,通常是有子类实现的,比如 NettyClient.doConnect(),看到这个方法,就应该知道 AbstractClient,使用锁的业务场景。
  • NettyClient.doConnect 会完成一次 NIO 通道的创建,并连接,整体初始化过程可以参考 NIO 通信中的 ChannelFuture 初始化的语义。
  • 因为 doConnect 语义并不是说每次调用都会去做初始化连接,所以使用 connectLock 锁,也不会损耗多上性能,其实也是轻量级的。

4. Dubbo 通信核心类 DubboInvoker

DubboInvoker 继承 AbstractInvoker 类,从类的名称来看肯定是与底层通信相关的语义,DubboInvoker 具备如下功能:

  • 维护 ExchangeClient;
  • 维护 invokers;
  • 实现 doInvoke 方法,完成一次 RPC 过程比较重要的 exchange 语义。

DubboInvoker 业务场景:

  • 定义 ReentrantLock destroyLock = new ReentrantLock() 锁。
  • 主要用于控制 invokers 的销毁,以及 ExchangeClient 通道的关闭,加锁是为了防止出现多次销毁的业务场景,因为 DubboInvoker 并不知道上层 API 是如何调用这个功能的。
  • 单独的去分析 destroyLock 的锁的粒度,感觉好像意义不大,那么从另外一个角度考虑,为什么其他的方法不需要加锁?比如 doInvoke 方法。
  • 上游是通过 AbstractInvoker.invoke 去调用 doInvoke 方法。
  • 从我们日常开发中可以看到,我们每次基于 Dubbo 调用一次 RPC,都会执行一次 doInvoke,从线程在虚拟机中的内存分配情况看,整个 doInvoke 过程在一个 JVM 里面是在一个局部方法中,所以说 DubboInvoker 是可以保证线程安全的,即使会存在多线程的场景,但是也会通过 Context 域和线程本地变量保证线程安全。

6.2 RocketMQ

1. RocketMQ RPC 通信核心 MQClientInstance

MQClientInstance 是 RocketMQ 整个底层消息 RPC 的核心类,包括:request-response channel、各种定时器任务、消息拉取线程 pullMessageService、消息负载线程 rebalanceService、消息 push service 线程等,可以说 MQClientInstance 是整个底层消息机制的核心,所以性能及可用性要求非常高。

MQClientInstance 业务场景:

  • MQClientInstance 会定时的处方清理已经下线的 Broker,cleanOfflineBroker 清理维度是当前 JVM 的本地缓存,所以需要 JVM 级别的全局锁 Lock lockNamesrv 来保证线程安全;
  • MQClientInstance 会提供 updateTopicRouteInfoFromNameServer 接口给 Producer,这个时候也是使用 Lock lockNamesrv 锁,因为 updateTopicRouteInfoFromNameServer 与 cleanOfflineBroker 作用域是一样,所以使用的是同一把锁;
  • Lock lockHeartbeat 锁,主要用于控制 Producer 和 Consumer 的心跳资源,试想下,如果 Producer 和 Consumer 不在同一个虚拟机中,锁没问题,如果业务使用不规范,在同一个应用中,自己生产消息,自己消费,那么 MQClientInstance 如何保证 Producer 和 Consumer 端的全局线程安全;
  • MQClientInstance 能够确保一个 JVM 下,基于 UUID 唯一性,确保只初始化一次,但是如果碰到多网卡,或者是虚拟化的机器,这个 UUID 规则还会生效嘛,感兴趣的话,可以一起 review 下 MQClientInstance 如何做的,这里就不展开了。

2. RocketMQ RPC 通信核心,客户端 NettyRemotingClient

NettyRemotingClient 主要封装了 RPC 通信的客户端功能:通道、通道事件监听、RPC Hook 等。

NettyRemotingClient 业务场景:

  • Lock lockChannelTables = new ReentrantLock(),维护一个 lockChannelTables 锁,锁的粒度是 channelTables,一个基于 ConcurrentHashMap 的通道缓存,key 为 IP 地址,value 为通道的一个包装类 ChannelWrapper;
  • 既然 channelTables 是线程安全的,那么为什么在写场景下,RocketMQ 还是加了可重入锁的语义?我想这个还是与 ConcurrentHashMap 的并发语义有关的,因为加锁的带来的性能损耗远比 ConcurrentHashMap 在数据量很大的场景下,由于并发写导致冲突之后的性能损耗的代价小很多。channelTables 中缓存的是通道,但是 key 是 IP,冲突的语义场景要多很多;
  • NettyRemotingClient 中还增加了一个锁语义 Lock lockNamesrvChannel = new ReentrantLock(),所得主要粒度是控制客户端获取或者创建通道,这里其实用到了锁细化的设计,比如如果可以在原子对象中获取到通道,就直接返回通道,这行逻辑是不在锁的范围内的,因为只有读的语义;如果需要重新创建通道,就涉及到写的场景,需要靠锁来保证线程安全。

3.RocketMQ 负载管理核心——负载锁管理 RebalanceLockManager

RebalanceLockManager 为 Broker 的后台命令管理提供基于 Group、Topic、客户端 ID 等维度的锁的管理,从而达到控制单台 Broker 实例的消费者和生产者行为的效果。

RebalanceLockManager 业务场景:

  • 因为 RocketMQ Broker 端事支持命令控制的,这个是给运维权限去动态的更改策略,RebalanceLockManager 就可以动态的锁定消息队列以及解锁消息队列;
  • 锁的资源粒度主要是 mqLockTable,一个基于 ConcurrentHashMap 的内存缓存;
private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
        new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
  • 支持批量加锁和释放锁、单个加锁和释放锁,但是为了平衡性能以及考虑业务运维会误操作,频繁的调用单个接口,为影响 Broker 的性能,所以单个加锁和释放锁功能在高版本中已经去掉了。

4. RocketMQ 主题配置管理 TopicConfigManager

TopicConfigManager 是 RocketMQ 针对消息主题配置的一个全局管理类,也是消息具备分布式功能的一个比较关键的类。

TopicConfigManager 业务场景:

  • 定义一个 lockTopicConfigTable 用于保证 topicConfigTable 的线程安全,topicConfigTable 是一个基于 ConcurrentHashMap 数据结构的内存缓存,key 值为 topicName value 为 TopicConfig;
  • 锁的粒度主要是在 topicConfigTable 需要变更配置数据,比如新增配置,先变更到内存中,然后再持久化到文件;
  • 这里有一个细节,就是在 lockTopicConfigTable 锁粒度范围内,即时入锁的条件是 topicConfigTable 中不存在某个名称的 Topic 的配置,但是线程入锁之后,还要判断一遍入锁条件,其实这个是严格保证线程安全的一个最佳实践,因为线程加锁和释放锁是需要时间的,但是当流量达到一定的量级,锁成为性能瓶颈,必须要达到单行代码级别的线程安全语义分析;
topicConfig = this.topicConfigTable.get(topic);
 if (topicConfig != null)
 return topicConfig;

6.3 Nacos

1. Nacos 集群选举核心类 RaftCore

RaftCore 类中使用了很多 JUC 的语义,集群选举对于 Nacos 来说,是非常重要的分布式功能。

RaftCore 业务场景:

  • Nacos 同时支持 CP 和 AP 模式的分布式特性,CP 模式采用的分布式 Raft 算法保证数据一致性,RaftCore 又是 Raft 算法实现的核心类,在单个 JVM 进程中,对于多线程通信以及线程安全以及性能之前的权衡,非常关键。
  • 在分布式数据持久化的过程中,见 RaftCore.signalPublish() 方法,使用 OPERATE_LOCK(可重入锁),保证整个分布式配置数据持久化的线程安全,并且锁的粒度,严格控制在指定方法功能内,不包含集群选举的逻辑代码。
  • 在 RaftCore.signalPublish() 方法中还通过 CountDownLatch,来实现按照初始化容量是否为 0,多线程是否等待的语义,这里之所以在 OPERATE_LOCK,可重入锁的控制范围内,还要加线程安全的语义,那是因为,锁是可重入的,并且可重入锁粒度范围内,CountDownLatch 控制的资源,是全局的,也就是说虽然方法整体是线程安全的,但是,还是需要使用,线程安全的语义来保证全局的线程安全,个人理解。

2. Nacos 服务管理核心类 ServiceManager

ServiceManager 主要负责存储整个 Nacos 的服务信息,是 Nacos 中服务治理核心类。

ServiceManager 业务场景:

  • 在 ServiceManager 中定义了全局锁 Lock lock=new ReentrantLock()。
  • 在 ServiceManager 中,维护了基于 LinkedBlockingDeque 的内存缓存,主要是用来存储服务的唯一的关键字信息,其实 LinkedBlockingDeque 里面的方法基本都是是线程安全的,也是基于 ReentrantLock,但是 ServiceManager 在使用 offer、poll 及 add 方法时,还要使用 lock 加可重入锁,我猜作者应该是为了保证 addUpdatedService2Queue 整体输出的线程安全,其实我也不想粘贴代码,太典型了,看大家怎么看?可以作为 Chat 的关键点,一起讨论下,欢迎留言。
  • 使用入口在 RaftController.serviceStatus 中,是一个 HTTP 方法,调用控制在接口使用方,不受 ServiceManager 控制。
String serverIP, String checksum) {
        lock.lock();
        try {
            toBeUpdatedServicesQueue.offer(new ServiceKey(namespaceId, serviceName, serverIP, checksum), 5, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            toBeUpdatedServicesQueue.poll();
            toBeUpdatedServicesQueue.add(new ServiceKey(namespaceId, serviceName, serverIP, checksum));
            Loggers.SRV_LOG.error("[DOMAIN-STATUS] Failed to add service to be updatd to queue.", e);
        } finally {
            lock.unlock();
        }
    }

3. Nacos 集群全局元数据管理类 SwitchManager

SwitchManager 主要负责 Nacos 集群的元数据管理,包括健康检查、黑白名单、心跳模式等,也是一个非常关键的类。

SwitchManager 业务场景:

  • SwitchManager 是一个元数据管理类,所以需要保证单实例的数据一致性;
  • 定义 Lock lock=new ReentrantLock() 全局锁;
  • 主要用 lock 来锁定 SwitchManager.update 方法,保证分布式集群模式下,数据能够一致性的通知到各个集群节点上,整个 update 方法在入口处,会通过 CP 或者 AP 保证只有一个机器可以做更新。

7 全文系统思维总结

本文也许并不是一篇非常详细的关于 JUC 知识点的文章,但是也是我自己结合 JDK 中 JUC 的整体的代码结构,再结合主流的中间件框架如何使用 JUC,完成线程通信的语义,并能保证线程安全,以及 JDK 自带的容器如何使用 JUC 语义,这些都是最佳实践,也是我们程序员在写代码过程中,能够快速参考的使用 JUC 的参照模板。

(1) AQS 是 JUC 的基础

(2) 锁及原子类又增加了并发语义的灵活性

(3) 线程池又是在 AQS 及锁的基础上,完成了对线程的复用

文章会有很多地方写的不完善,但是也是作者花了很多时间整理的,谢谢大家。


下一期:开启35岁程序员高并发认知系列


本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构随笔录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档