写了几篇扫盲性质的java数据结构文章后,感觉好像都太偏向于理论了,也没有从实战角度出发。写博客的初衷的也是为了提升个人技术的同时,能够将技术更好的应用到日常的学习与工作中【当然,用到面使中也是极好哈,哈哈哈】。本文将给大家大家介绍一个博主日常工作中最喜欢使用的一个线程池工具类。这个类是在我结合很多日常业务场景下整合出来的一个工具类,开箱即用,希望能够帮助到大家
如果有小伙伴在公司的生产环境中使用本文的线程池类请注明来自此博文哦~
老规矩,我们还是照顾一下初学java或者正在学习线程池的同学。先系统性的介绍一下线程池的概念。
线程池维护了一组可重复使用的线程,并且能够在一定范围内进行伸缩扩容可重复使用线程。
我问一下大家常见的新建异步线程的方式有哪几种?OK不卖关子了
我们把123归为一类,4是单独的一类。123创建线程的方式是显示的在代码中调用创建一个一次性使用的线程,如果对应的业务接口被高频访问,那么新建出来的线程就会很多,但是这种线程往往生命周期很多,线程的创建与销毁一来一回就占据很多时间。因此就出现了线程池,将同一类需要执行的那些任务,放到线程池中,让线程池去重复利用线程执行,减少了线程的创建与销毁的次数,还可以充分的利用多核CPU去执行任务,性能拉满。
1.固定线程数的线程池(newFixedThreadPool)
2.缓存的线程池(newCachedThreadPool)
3.单个线程的线程池(newSingleThreadExecutor)
4.固定个数的线程池(newScheduledThreadPool)
上述线程池介绍非本文重点,不做详细展开,我还是暖心的贴上一个介绍的链接:https://www.cnblogs.com/frankyou/p/9467905.html
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1.corePoolSize - 线程池核心池的大小
2.maximumPoolSize - 线程池的最大线程数。
3.keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
4.unit - keepAliveTime 的时间单位。
5.workQueue - 用来储存等待执行任务的队列。
队列类型 ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。 LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。 PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。 DelayQueue: 一个使用优先级队列实现的无界阻塞队列。 SynchronousQueue: 一个不存储元素的阻塞队列。 LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。 LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。 队列解析:https://www.cnblogs.com/guoyu1/p/13573328.html
6.threadFactory - 线程工厂【用于自定义创建的的线程】
7.handler - 拒绝策略
ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。 (默认) ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 拒绝策略解析:https://blog.csdn.net/suifeng629/article/details/98884972
线程池如何根据线程池参数运行图
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式, 这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
阿里为什么会给出上面的建议,同学们打开自己的编译器,分别看一下2.3里面jdk提供的线程池的构造方法,再看一下2.4处对应的工作队列,最后结合一下2.4最后的运行图。我想大家明白了吧?
1)newFixedThreadPool 和 newSingleThreadExecutor: 主要问题是使用无界队列,异常情况下堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。 2)newCachedThreadPool 和 newScheduledThreadPool: 主要问题是最大线程数为 Integer.MAX_VALUE,异常情况下回导致创建非常多的线程数,甚至 OOM。
最佳线程池实例支持的功能列表
1.支持懒加载线程池
2.支持ThreadLocal线程变量从父线程传递至子线程
3.支持链路追踪traceId打印问题定位
4.支持批量任务提交
5.支持线程池任务运行异常捕获
import java.util.List;
import java.util.concurrent.Future;
/**
* 线程池接口
*
* @author baiyan
* @since 2021/05/11
*/
public interface ThreadPoolService {
/**
* 向线程池中添加任务
* @param task 任务
* @return 任务(必须实现Runnable接口)
*/
Future<?> addTask(Runnable task);
/**
* 异步执行一批任务,直到任务执行完成
* @param task 任务
*/
void runTasksUntilEnd(List<Runnable> task);
/**
* 向线程池中添加循环运行的任务
* @param task 任务(必须实现Runnable接口)
* @param interval 时间间隔,单位毫秒
*/
void loopTask(Runnable task, long interval);
/**
* 向线程池中添加循环运行的任务
* @param task 任务(必须实现Runnable接口)
* @param interval 时间间隔,单位毫秒
* @param delay 延迟执行的时间,单位毫秒,表示任务在delay ms后开始被定时调度
*/
void loopTask(Runnable task, long interval, long delay);
/**
* 停止线程池
*/
void stop();
/**
* 获取线程池中正在执行的线程数目
* @return 激活线程数
*/
int getActiveCount();
}
定义业务侧常用的线程池方法
import com.alibaba.ttl.TtlRunnable;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.instrument.async.LazyTraceThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程池实现
*
* @author baiyan
* @since 2021/05/11
*/
@Slf4j
public class ThreadPoolServiceImpl implements ThreadPoolService {
/**
* 主线程数
*/
@Setter
private int corePoolSize = 20;
/**
* 最大线程数
*/
@Setter
private int maximumPoolSize = 60;
/**
* 线程池维护线程所允许的空闲时间
*/
@Setter
private int keepAliveTime = 60;
/**
* 单例线程池
*/
private LazyTraceThreadPoolTaskExecutor threadPoolExecutor;
/**
* 单例定时任务线程池
*/
private ScheduledExecutorService scheduledExecutorService;
/**
* 线程池所使用的缓冲队列的大小
*/
@Setter
private int queueSize = 100;
@Setter
private boolean inited = false;
/**
* 当线程池满时,是否阻塞住
*/
@Setter
private boolean blockWhenFull = true;
/**
* 初始化单例线程池
*/
public synchronized void init() {
if(inited) {
return;
}
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(this.corePoolSize);
threadPoolTaskExecutor.setMaxPoolSize(this.maximumPoolSize);
threadPoolTaskExecutor.setQueueCapacity(this.queueSize);
threadPoolTaskExecutor.setKeepAliveSeconds(this.keepAliveTime);
threadPoolTaskExecutor.setThreadNamePrefix("baiyan-Thread-");
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
threadPoolTaskExecutor.initialize();
this.threadPoolExecutor = new LazyTraceThreadPoolTaskExecutor(SpringContextUtil.getApplicationContext(), threadPoolTaskExecutor);
inited = true;
}
@Override
public Future<?> addTask(Runnable task) {
if(!inited) {
init();
}
return threadPoolExecutor.submit(TtlRunnable.get(task));
}
/**
* 线程池满时拒绝策略类
*/
private class BlockingQueuePut implements RejectedExecutionHandler {
/**
* 拒绝策略时二次尝试增加队列任务,添加任务失败则进行异常报错
* @param r
* @param executor
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(blockWhenFull) {
try {
executor.getQueue().put(r);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
@Override
public int getActiveCount() {
return threadPoolExecutor.getActiveCount();
}
@Override
public void stop() {
threadPoolExecutor.shutdown();
if(scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
}
@Override
public synchronized void loopTask(Runnable task, long interval) {
loopTask(task, interval, 0);
}
@Override
public void loopTask(Runnable task, long interval, long delay) {
if(scheduledExecutorService == null) {
ThreadFactory threadFactory = new ThreadPoolServiceImpl.ScheduledThreadFactory("schedule-pool-%d-%s");
scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory);
}
int minInterval=100;
if(interval < minInterval) {
throw new IllegalArgumentException("不允许调度100ms以内的循环任务");
}
scheduledExecutorService.scheduleAtFixedRate(TtlRunnable.get(task), delay, interval, TimeUnit.MILLISECONDS);
}
@Override
public void runTasksUntilEnd(List<Runnable> tasks) {
List<Future<?>> futures = new ArrayList<Future<?>>();
for(Runnable task : tasks) {
futures.add(addTask(task));
}
for(Future<?> f : futures) {
try {
f.get();
} catch (Exception e) {
log.warn("", e);
}
}
}
/**
* 获取单例线程池实例
* @return
*/
protected LazyTraceThreadPoolTaskExecutor getExecutorService() {
return threadPoolExecutor;
}
/**
* 定时任务线程工厂
*/
static class ScheduledThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
ScheduledThreadFactory(String namePrefix) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = String.format(namePrefix, poolNumber.getAndIncrement(), "%d");
}
String getThreadName() {
return String.format(namePrefix,
threadNumber.getAndIncrement());
}
//创建守护线程
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, getThreadName(), 0);
if (!t.isDaemon()){
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY){
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}
当业务应用调用addTask方法时,会先进行判断
if(!inited) {
init();
}
当前线程池是否进行了初始化,如果没有初始化,则先进行初始化动作
调用init()方法
/**
* 初始化线程池
*/
public synchronized void init() {
if(inited) {
return;
}
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(this.corePoolSize);
threadPoolTaskExecutor.setMaxPoolSize(this.maximumPoolSize);
threadPoolTaskExecutor.setQueueCapacity(this.queueSize);
threadPoolTaskExecutor.setKeepAliveSeconds(this.keepAliveTime);
threadPoolTaskExecutor.setThreadNamePrefix("baiyan-Thread-");
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
threadPoolTaskExecutor.initialize();
this.threadPoolExecutor = new LazyTraceThreadPoolTaskExecutor(SpringContextUtil.getApplicationContext(), threadPoolTaskExecutor);
inited = true;
}
ThreadPoolTaskExecutor为spring提供的线程池,内部的实现也是基于ThreadPoolExecutor,此处使用LazyTraceThreadPoolTaskExecutor的原因是,为了接入springcloud组件sleut【链路追踪】,便于记录请求或者定时任务调用线程池时的链路。
线程池任务提交支持多种方式
/**
* 向线程池中添加任务
* @param task 任务
* @return 任务(必须实现Runnable接口)
*/
Future<?> addTask(Runnable task);
/**
* 异步执行一批任务,直到任务执行完成
* @param task 任务
*/
void runTasksUntilEnd(List<Runnable> task);
/**
* 向线程池中添加循环运行的任务
* @param task 任务(必须实现Runnable接口)
* @param interval 时间间隔,单位毫秒
*/
void loopTask(Runnable task, long interval);
/**
* 向线程池中添加循环运行的任务
* @param task 任务(必须实现Runnable接口)
* @param interval 时间间隔,单位毫秒
* @param delay 延迟执行的时间,单位毫秒,表示任务在delay ms后开始被定时调度
*/
void loopTask(Runnable task, long interval, long delay);
其中对应的方法实现类都包裹了TtlRunnable,使得父线程的ThreadLocal变量可以传递进来。
此处关于线程池内ThreadLocal变量传递解析参考我的另一篇博客:https://cloud.tencent.com/developer/article/2079815
关于核心线程数的设置,网上有一套公式
IO密集型 = 2NCPU
CPU密集型=NCUP+1
我个人觉得是,如果对线程池的运行依赖没有那么高,或者线程池使用频率不高,只是有些任务走的那可以依靠上面的公式来配置核心线程数。我觉得还是需要从实际的线上环境的机器配置,实际业务运行状态进行压测来设置参数。下面推荐两篇文章
1.关于线程池参数如何根据实际业务场景设置:https://www.cnblogs.com/superming/p/12887552.html 2.美团骚操作,线上通过配置中心动态设置线程池参数:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
/**
* 通用线程池
*
* @author baiyan
* @date 2021/05/11
*/
public class CommonThreadPool {
/**
* 单例线程池
*/
private volatile static ThreadPoolService threadPoolService;
/**
* 构造方法私有化
*/
private CommonThreadPool(){
}
/**
* 单例线程池获取
* @return
*/
public static ThreadPoolService getBlockThreadPool() {
if (threadPoolService == null) {
synchronized (CommonThreadPool.class) {
if (threadPoolService == null) {
threadPoolService = new ThreadPoolServiceImpl();
}
}
}
return threadPoolService;
}
}
业务应用基于实现类与接口使用单例模式创建线程池,建议一类业务创建一个线程池,业务应用中线程池总数建议不超过3个。
不捕获异常
@RestController
@Slf4j
public class DemoController {
@GetMapping("/test")
public void test(){
for (int i = 0; i < 10; i++) {
int finalI = i;
CommonThreadPool.getBlockThreadPool().addTask(()->{
log.info("线程线程:"+ finalI);
});
}
}
}
控制台输出
- 2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,01e67ff7c6edc0db,ad8d85b916f42108], [baiyan-Thread-4], com.examp.FileController - 线程线程:3
- 2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,7ab5e7047f3543e5,ad8d85b916f42108], [baiyan-Thread-2], com.examp.FileController - 线程线程:1
- 2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,591a8274b5ee66e5,ad8d85b916f42108], [baiyan-Thread-7], com.examp.FileController - 线程线程:6
- 2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,f1760980349388b9,ad8d85b916f42108], [baiyan-Thread-6], com.examp.FileController - 线程线程:5
- 2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,fb6103c607690d63,ad8d85b916f42108], [baiyan-Thread-8], com.examp.FileController - 线程线程:7
- 2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,f67e9fdbb5f7dfa9,ad8d85b916f42108], [baiyan-Thread-9], com.examp.FileController - 线程线程:8
- 2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,55cb195f3d0f0708,ad8d85b916f42108], [baiyan-Thread-3], com.examp.FileController - 线程线程:2
- 2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,0f5fcc1ec768d003,ad8d85b916f42108], [baiyan-Thread-5], com.examp.FileController - 线程线程:4
- 2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,78339a17929239e4,ad8d85b916f42108], [baiyan-Thread-1], com.examp.FileController - 线程线程:0
- 2021-05-11 21:36:38.596, INFO, [ad8d85b916f42108,747a5b593797f9c7,ad8d85b916f42108], [baiyan-Thread-10], com.examp.FileController - 线程线程:9
需要捕获异常的情况
@RestController
@Slf4j
public class FileController {
@GetMapping("/test")
public void test() throws ExecutionException, InterruptedException {
List<Runnable> runnableList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
runnableList.add(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("1");
});
}
CommonThreadPool.getBlockThreadPool().runTasksUntilEnd(runnableList);
}
}
runTasksUntilEnd方法内部调用feature.get()方法,能够获取到任务执行的异常。
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* spring上下文工具类
*
* @author baiyan
* @date 2021/05/11
*/
@Component
public class SpringContextUtil implements ApplicationContextAware {
/**
* spring应用上下文
*/
private static ApplicationContext applicationContext;
/**
* 实现ApplicationContextAware接口的回调方法。设置上下文环境
* @param applicationContext
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
SpringContextUtil.applicationContext = applicationContext;
}
/**
* @return ApplicationContext
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 获取对象
* @param name
* @return Object
* @throws BeansException
*/
public static Object getBean(String name) throws BeansException {
return applicationContext.getBean(name);
}
/**
* 获取对象
* @param clazz
* @return Object
* @throws BeansException
*/
public static <T> T getBean(Class<T> clazz) throws BeansException {
return applicationContext.getBean(clazz);
}
}
//TTL依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.2.0</version>
</dependency>
//链路追踪依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
对于链路追踪与TTL的依赖
github上时候的线程池是jdk提供的ThreadPoolExecutor,因此不支持链路追踪。大家可以根据博客做一下修改就可。当然如果没有链路追踪诉求的,也可以直接使用github上的代码。