大家好,我是悟空。
今天来一波多线程核心知识点汇总,共 16道,非常经典了。
先关注再看,形成习惯。回复多线程下载 PDF 版
Java 在 1.5 版本之前所谓的线程安全的容器,主要指的就是同步容器。
同步容器的问题:
不过同步容器有个最大的问题,那就是性能差,所有方法都用 synchronized 来保证互斥,串行度太高了。
因此 Java 在 1.5 及之后版本提供了性能更高的容器,我们一般称为并发容器。
并发容器的分类:
并发容器虽然数量非常多,但依然是前面我们提到的四大类:List、Map、Set 和 Queue,下面的并发容器关系图,基本上把我们经常用的容器都覆盖到了。
原子整型类 AtomicInteger 的 getAndIncrement 方法就用到 CAS。
比如这一段代码:
atomicInteger.compareAndSet(10, 20);
调用 atomicInteger 的 CAS 方法,先比较当前变量 atomicInteger 的值是否是10,如果是,则将变量的值设置为20。
CAS 带来的问题:
1)频繁出现自旋,循环时间长,开销大(因为执行的是do while,如果比较不成功一直在循环,最差的情况,就是某个线程一直取到的值和预期值都不一样,这样就会无限循环)
2)只能保证一个共享变量的原子操作
ABA 问题:
因为 CAS 需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。
解决方案:
ABA 问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加 1,那么 A→B→A 就会变成 1A→2B→3A。
从Java 1.5开始,JDK 的 Atomic 包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。这个类的compareAndSet 方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
原子类AtomicStampedReference
的底层代码:
比较并替换方法compareAndSet
。
compareAndSet 源码
expectedReference
:期望值
newReference
:替换值
expectedStamp
:期望版本号
newStamp
:替换版本号
先比较期望值 expectedReference 和当前值是否相等,以及期望版本号和当前版本号是否相等,如果两者都相等,则表示没有被修改过,可以进行替换。
举例说明:
甲乙线程想改变三角形 A 的形状,乙线程先改成了四边形,后又改成了三角形,三角形 A1->四边形 V2-> 三角形 A3。
当甲线程想改变 A3 为五边形时报错,因为三角形经过已线程修改后,前后版本号不一样,被判定为已修改过,其他线程不能修改。这样就防止了 ABA 问题。
举例说明
原理图1-Java内存模型
Why
:屏蔽各种硬件和操作系统的内存访问差异
JMM是Java内存模型,也就是Java Memory Model,简称JMM,本身是一种抽象的概念,实际上并不存在,它描述的是一组规则或规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。
Java内存模型的几个规范:
由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),工作内存是每个线程的私有数据区域,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写会主内存
,不能直接操作主内存中的变量,各个线程中的工作内存中存储着主内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成,其简要访问过程:
原理图3-Java内存模型
关于有序性:如果在本线程内观察,所有的操作都是有序的;如果在一个线程中观察另一个线程,所有的操作都是无序的。前半句是指“线程内似表现为串行的语义”(Within-Thread As-If-Serial Semantics),后半句是指“指令重排序”现象和“工作内存与主内存同步延迟”现象。
适用于变量在线程间隔离,而在方法或类间共享的场景。如果用户信息的获取比较昂贵(比如从数据库查询用户信息),那么在 ThreadLocal 中缓存数据是比较合适的做法。
线程池会重用固定的几个线程,一旦线程重用,那么很可能首次从 ThreadLocal 获取的值是之前其他用户的请求遗留的值。
阿里嵩山开发手册:
【强制】必须回收自定义的ThreadLocal变量,尤其在线程池场景下,线程经常会被复用,如果不清理自定义的ThreadLocal变量,可能会影响后续业务逻辑和造成内存泄露等问题。尽量在代理中使用try-finally块进行回收。
正例:
objectThreadLocal.set(userInfo);
try{
// ...
}
finally{
objectThreadLocal.remove();
}
AbstractQueuedSynchronizer 类如其名,抽象的队列式的同步器,将基础的同步相关操作抽象在 AbstractQueuedSynchronizer 中,利用 AQS 为我们构建同步结构提供了范本。
AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。
将基础的同步相关操作抽象在 AbstractQueuedSynchronizer 中,利用 AQS 为我们构建同步结构提供了范本。
AQS 内部数据和方法,可以简单拆分为:
利用 AQS 实现一个同步结构,至少要实现两个基本类型的方法,分别是 acquire 操作,获取资源的独占权;还有就是 release 操作,释放对某个资源的独占。
当提交一个新任务到线程池时,具体的执行流程如下:
阻塞队列用于保存等待执行的任务。当任务的数量超过 corePoolSize 数量,后续的任务将会进入阻塞队列,阻塞排队。
有以下几种阻塞队列:
是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
一个基于链表结构的阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法 Executors.newFixedThreadPool() 使用了这个队列。(newFixedThreadPool 用于创建固定线程数)
LinkedBlockingQueue 原理
一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用这个队列。(newCachedThreadPool 用于根据需要创建新线程)
SynchronousQueue 原理
一个具有优先级的无限阻塞队列。
PriorityBlockQueue的原理图
之前写过一篇文章:干货 | 45张图庖丁解牛18种Queue,你知道几种?
CPU 密集型计算:大部分场景下都是纯 CPU 计算。
对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的 CPU,每个核一个线程,理论上创建 4 个线程就可以了,再多创建线程也只是增加线程切换的成本。
理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
由于 I/O 设备的速度相对于 CPU 来说都很慢,所以大部分情况下,I/O 操作执行的时间相对于 CPU 计算来说都非常长,这种场景我们一般都称为 I/O 密集型计算。
最佳线程数 = CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。
ThreadPoolExecutor 已经提供了以下 4 种策略。
以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际需要,完全可以自己扩展 RejectedExecutionHandler 接口。
线程池生命周期包括:
生命周期状态和方法对应的关系:
通常开发者都是利用 Executors 提供的通用线程池创建方法,去创建不同配置的线程池,主要区别在于不同的 ExecutorService 类型或者不同的初始参数。
Executors 目前提供了 5 种不同的线程池创建配置:
线程池提供了两个方法来终止线程:shutdown()和shutdownNow()。
shutdown() 方法是一种很保守的关闭线程池的方法。线程池执行 shutdown() 后,就会拒绝接收新的任务,但是会等待线程池中正在执行的任务和已经进入阻塞队列的任务都执行完之后才最终关闭线程池。
而 shutdownNow() 方法,相对就激进一些了,线程池执行 shutdownNow() 后,会拒绝接收新的任务,同时还会中断线程池中正在执行的任务,已经进入阻塞队列的任务也被剥夺了执行的机会,不过这些被剥夺执行机会的任务会作为 shutdownNow() 方法的返回值返回。因为 shutdownNow() 方法会中断正在执行的线程,所以提交到线程池的任务,如果需要优雅地结束,就需要正确地处理线程中断。
如果提交到线程池的任务不允许取消,那就不能使用 shutdownNow() 方法终止线程池。不过,如果提交到线程池的任务允许后续以补偿的方式重新执行,也是可以使用 shutdownNow() 方法终止线程池的。
用一个 printStats 方法实现了最简陋的监控,每秒输出一次线程池的基本内部信息:
private void printStats(ThreadPoolExecutor threadPool) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
log.info("=========================");
log.info("Pool Size: {}", threadPool.getPoolSize());
log.info("Active Threads: {}", threadPool.getActiveCount());
log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
log.info("=========================");
}, 0, 1, TimeUnit.SECONDS);
}
getPoolSize():获取线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁。
getActiveCount():获取活跃的线程数。
getCompletedTaskCount:获取线程池在运行过程中已完成的任务数量。
getQueue().size():获取队列中还有多少积压任务。
恭喜你看到这里,后续还有 Redis,MySQL 系列。
- END -