前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解ThreadPoolExecutor源码

深入理解ThreadPoolExecutor源码

作者头像
大猫的Java笔记
发布2021-09-08 10:36:59
3780
发布2021-09-08 10:36:59
举报
文章被收录于专栏:大猫的Java笔记

1.ThreadPoolExeCutor是什么?

线程池的作用是为了解决频繁创建和销毁线程的性能问题,类似的有MySQL连接池。

2.基础介绍

ThreadPoolExecutor得7个参数

ThreadPoolExecutor最常用的构造方法有7个参数如下:

1.corePoolSize(核心线程数),核心线程数可以理解为公司的人员。无论忙还是不忙你都在公司上班,不会说闲的时候就被开除。

2.maximumPoolSize(最大线程数),最大线程数可以理解为公司最多的员工能达到多少,包括了外包人员。

3.keepAliveTime(非核心线程存活时间),非核心线程可以理解为公司的外包人员,存活时间就是指外包在公司最多只能待多久时间,超过这个时间就要离开。

4.unit(非核心线程存活时间单位)

5.workQueue(任务队列),任务队列是指线程提交的任务存放的地方,可以理解为公司的需求量太大,此时本部人员忙不过来了。先把任务放在一个队列中,等核心人员有空了就去处理,如果任务队列也放不下了,此时就叫外包人员来帮忙。

6.threadFactory(线程工厂),用于线程的生产。

7.handler(拒绝策略),前面说过如果任务太多核心线程忙不过来,此时交给任务队列,而任务队列也满了,此时如果最大线程数就是核心线程数,执行拒绝策略。如果新加非核心线程,如果非核心线程+核心线程大于最大线程数执行拒绝策略。

拒绝策略与自定义拒绝策略

拒绝策略是指当核心线程忙不过来的时候,同时任务队列满了,线程数大于最大线程数的时候此时线程池的拒绝方式。其中拒绝策略能够进行自定义,Java中提供的4中拒绝策略如下。

1.AbortPolicy(直接抛出RejectedExecutionException异常)

2.CallerRunsPolicy(将任务交给提交任务者执行)

3.DiscardOldestPolicy(丢弃任务队列中最早的任务,)

4.DiscardPolicy(丢弃当前任务)

通过7个参数中的拒绝策略可以看到RejectedExecutionHandler是个接口,所以要自定义拒绝策略只需要实现RejectedExecutionHandler。

为什么阿里规范需要自定义线程池

如果使用Executors中JDK提供好的线程池如newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool、这些都是使用ThreadPoolExecutor进行实现的,但是他们的工作队列都是采用的无边界的LinkedBlockingQueue,当提交任务过多,核心线程处理不过来的时候,任务会大量堆积在LinkedBlockingQueue中,而此时会造成OOM的异常。原因也就是LinkedBlockingQueue的最大为Integer.MAX_VALUE。

线程池的状态

线程池一共有5种状态

1.RUNNING状态,线程池运行中。能接受新的任务,也会处理队列中的任务。

2.SHUTDOWN状态,线程池调用shutDown方法的时候变成此状态,同时线程池不接受新的任务,但是会处理掉任务队列中的任务。

3.STOP状态,线程池调用shutDownNow方法的时候变成此状态,同时线程池不接受新的任务,也不处理任务队列中的任务。

4.TIDYING状态,所有的任务都已终止,workerCount (有效线程数)=0 。线程池进入该状态后会调用terminated()钩子方法进入TERMINATED 状态。

5.TERMINATED状态,线程池结束的状态。当进入TIDYING状态后执行完 terminated()钩子方法(默认该方法没有任何内容,可以自己手动实现),执行完毕后变成TERMINATED状态。

3.源码中重要的属性、方法和类

如Executor接口中提供了exectute方法,ExecutorService则是对Executor的继承,同时定义了线程池中的常用方法,而Executors则扮演的是一个工厂,负责生产一些常用的线程池(例如上面所说的几种JDK提供的线程池),而由于ThreadPoolExecutor是继承AbstractExecutorService同时AbstractExecutorService实现了ExecutorService所以ThreadPoolExecutor具备ExecutorService的方法。

ThreadPoolExecutor中重要的字段

1.ctl是一个32位的int类型字段,其中高3位表示线程池的状态,由于线程池有5个状态,如果使用2位表示是不够的,所以使用高3位表示线程状态,低29位表示有效线程数量。

2.线程的状态,一共5个,只有running状态是小于0的(方便判读)

3.workers是一个HashSet用于存放worker。worker具体是什么下面介绍。

重要的类Worker

继承AbstractQueuedSynchronizer,说明有了AQS的能力,也就是有加锁和解锁的能力,同时实现Runnable并重写了run方法。其中Worker中只是写了一部分的方法。

重要的方法

workerCountOf方法的作用是拿到线程数量 c是ctl其中CAPACITY为00011111 11111111 11111111 11111111通过&得到低29位也即是线程数量。

runStateOf方法的作用是拿到线程池的状态 c是ctl其中CAPACITY为00011111 11111111 11111111 11111111 取反以后得到11100000 00000000 00000000 00000000,然后与c相当于取线程池当前的状态。

runStateOf方法的作用相当于求rs和wc的并集,即把线程池状态和线程数量进行合并成为ctl。

isRunning方法的作用是判断线程池是不是running状态,只有running状态才是小于SHUTDOWN的。

4.execute方法源码

1.拿到ctl。

2.如果工作线程数小于核心线程数,那么添加核心线程数(addWorker(command,true),true表示核心线程,false为非核心线程) 添加成功直接返回,失败判断线程池的状态。

3.如果线程池状态是running状态,说明核心线程数满了此时加入到任务队列,添加后重新拿到ctl进行复查,如果不是running状态 说明在添加成功后线程池被关闭了,此时将刚刚放入的任务remove掉,并且执行拒绝策略,如果是running状态且工作线程数为0 此时添加一个空的非核心线程。

4.如果不是running状态,说明此时线程池被关闭了,那么执行addWorker(command, false),需要执行这一步的原因是,因为不知道是 任务队列满了还是线程池被关闭,使用addWorker(command, false)相当于确认一下是不是线程数达到最大数,如果是false直接执行拒绝策略

实际上execute的流程如下

1.核心线程是否不满,添加核心线程。

2.如果满了添加到任务队列,如果任务队列没有满添加成功。

3.如果任务队列也满了,此时线程数是否大于最大线程数 如果不大于最大线程数,此时添加非核心线程数,如果非核心线程数满了执行拒绝策略。

addWorker方法源码

addWorker源码

1.取ctl和线程池的状态

2.判断线程池的状态是不是其中一种任意一种状态SHUTDOWN SHUTDOWN STOP TIDYING TERMINATED 也就是线程池被关闭了,同时当前状态不是SHUTDOWN状态,或者firstTask不为空,或者工作队列是空的,返回false。

3.判断线程池的工作线程数量是否大于最大值,或者大于核心线程数或最大线程数返回false,不大于使用cas线程数+1。

4.如果cas失败,说明存在其他线程也在+1,此时判断线程池状态是否关闭,关闭则退出循环,否则死循环直到+1成功或者大于最大线程数退出。

5.cas成功后添加Worker,Worker的定义可以参考上面,同时把Worker加到HashSet中。

6.如果线程池是running状态,或者为SHUTDOWN状态同时firstTask为null的时候此时workerAdded状态为true,表示添加成功。

7.根据添加成功的条件进行启动线程,调用start方法。

runWorker方法源码

根据execute方法和addWorker方法实际上如果核心线程任务处理完毕后,或者说核心线程空闲的时候应该及时处理队列中的任务,但是可以看到execute方法和addWorker方法中没有具体的实现,实际上这部分的实现是存在Worker类的run方法中。其中run方法调用了runWorker方法。

runWorker方法的作用就是从getTask方法中获取任务,并执行提交的任务,但是getTask方法是可能会被阻塞的,原因是没有任务的时候,当然如果非核心线程拿不到任务根据存活时间还是没有拿到此时会getTask返回null,同时会调用runWorker中的processWorkerExit方法将线程从HashSet中移除。

getTask方法源码

getTask的作用就是获取线程池状态,然后判断是否核心线程允许超时,以及从队列中拿到任务。如果核心线程设置为允许超时,实际上核心线程也会被销毁。其中40到42行代码就是判断是否是非核心线程或者是核心线程允许超时,此时调用的是workQueue.poll()方法跟了一个超时时间,此处的超时时间就是线程存活时间,如果这个时间都没有拿到数据,此时会重新循环,并且timedOut为true。然后26到42会返回null,从而调用runWorker方法的processWorkerExit进行线程的销毁。当然如果没有设置核心线程为超时,此时的核心线程会调用take()方法进行任务处理,如果能获取到任务就处理,获取不到就阻塞(也就是我们说的空闲状态的核心线程,但不进行销毁,一旦新任务来了线程马上进行消费)

processWorkerExit方法源码

processWorkerExit的作用就是把核心线程设置为可以超时的以及非核心线程存活时间已经超过了的时候,此时从workers的HashSet中将线程移除掉。同时如果是被中断或者是getTask为null的都进行工作线程-1的操作。以及如果线程池状态变成了关闭以后的状态,这里只会是被中断的时候才会产生。所以需要保留一个线程对任务队列中的任务进行处理。

tryTerminate方法源码

tryTerminate的作用就是如果线程池关闭了,中断阻塞的线程,然后如果队列不是空的此时的任务依然需要被其他线程处理,同时需要将线程池的状态设置为TIDYING,同时工作线程设置为0,最后调用trminated钩子方法设置状态为TERMINATED方法将工作线程再次设置为0,最后唤醒阻塞再awaitTermination方法上的调用线程。

interruptIdleWorkers方法源码

interruptIdleWorkers的方法就是将空闲的线程进行打断,怎么判断是否空闲了,那就是阻塞在getTask方法上的线程也就是说线程从队列中获取不到任务,那么就代表是空闲的,所以interruptIdleWorkers中判断线程空闲会看是否中断如果中断说明已经被打断了不需要再去叫,同时还要看是否能够拿到锁,原因是再runWorker中线程如果再运行的话实际上会加锁。如果即没有中断也能拿到锁,说明线程是空闲的。到此为止,execute方法的源码已经结束。

5.shutdown方法源码

shutdown方法会更改线程池的状态,同时将空闲的线程进行打断,并且最后调用onShutdown()这个钩子方法,并且通过tryTerminate方法对线线程池最后的状态变更以及判断是否没有剩余线程去消费任务队列中的任务。如果是则启用一个第一个任务为空的非核心线程去处理。

tryTerminate方法源码

拿到ctl,同时看状态是否为目标状态,如果是则说明已经是此状态了,那么不需要进行cas进行更改状态,直接退出,如果不是则使用cas进行更改,cas失败则一直进行死循环直到要么已经是目标状态,要么修改成功。

6.shutdownNow方法源码

shutdownNow方法相比shutdown,是不接受新的任务,同时也不处理任务队列中剩余的任务。所以会将工作队列中剩余的任务进行返回。

drainQueue方法源码

drainQueue方法的作用就是将任务队列中剩余的任务添加到List中。

整个ThreadPoolExecutor方法的源码到此结束,其中某些方法没有进行分析,由于水平有限或编写问题当中可能涉及到错误的地方,望指正。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大猫的Java笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档