大家好,我是田哥
前两天,有位星友(知识星球里的朋友简称)私信我,问在项目中如何使用线程池,关于线程池的原理和八股文相关的都可以背,但是要是问到你们项目中是怎么用的,心里总是有点慌。
话不多说,我们直接步入正题。
我在这篇文章中聊过线程池相关的:
《阿里巴巴JAVA开发手册》
有这样一条强制规定:线程池不允许使用Executors
去创建,而应该通过ThreadPoolExecutor
方式,这样处理方式更加明确线程池运行规则,规避资源耗尽风险。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
上面这两种方式创建线程池使用的阻塞队列是LinkedBlockingQueue
/**
* A constant holding the maximum value an {@code int} can
* have, 2<sup>31</sup>-1.
*/
//2的31次方,然后在减1 2147483647
@Native public static final int MAX_VALUE = 0x7fffffff;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
不设大小理论上队列容量无上限,所以可能会堆积大量请求从而导致OOM
。
以上这种方式在,咱们就不聊了。
在项目中,我们通常有两种方式创建线程池:
Spring Boot
创建线程池比如说我们项目中需要处理用户登录日志,但是此时不想因为记录登录日志耽搁了登录。
如果我们使用同步的方式,可能会因为一些不太需要实时结果的,并且又耗时的业务可能会导致整个业务变慢:
耗时:200ms=100ms+100ms
如果使用线程池做了异步化后,直接创建个任务丢到线程池里,这样就减少了后面那100ms的等待时间。
在实际项目中,也有很多项目使用消息队列来做异步化,这个看项目情况来,比如:开发成本、后期运维成本等。
静态方式就是当做一种工具类使用,代码实现如下:
package com.tianwc.myblog.pool;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
//这里的相关只是一个演示,大家在定参数时,还是要以具体情况来
public class ThreadPoolUtil {
//获取CPU核数
static int cpuNums = Runtime.getRuntime().availableProcessors();
/** 线程池核心池的大小*/
private static int corePoolSize = 10;
/** 线程池的最大线程数*/
private static int maximumPoolSize = cpuNums * 5;
//阻塞队列容量
private static int queueCapacity = 100;
//活跃时间,
private static int keepAliveTimeSecond = 300;
public static ExecutorService httpApiThreadPool = null;
static{
System.out.println("创建线程数:"+corePoolSize+",最大线程数:"+maximumPoolSize);
//建立10个核心线程,线程请求个数超过20,则进入队列等待
httpApiThreadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTimeSecond,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity),new ThreadFactoryBuilder().setNameFormat("PROS-%d").build());
}
}
关于线程池参数设置,可以参考:面试小抄中并发编程部分有详细说明。
在业务代码中的使用:
ThreadPoolUtil.httpApiThreadPool.submit(new Thread(new Runnable() {
@Override
public void run() {
System.out.println("======== 登录日志记录=====start=======");
try {
// TODO: 2022/4/14 业务处理
Thread.sleep(1000L);
System.out.println("userId=" + userId);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("========登录日志记录------end=======");
}
}));
很简单吧!
但是这种方式存在很多问题,很多项目也是这么在用的。
比如想动态修改线程池参数,这种方式就不好处理了
我们再来看看Spring Boot
创建方式;
我们可以把线程池相关参数配置在配置文件中application.yaml(application.properties)
。
threadpool:
corePoolSize: 8
maxPoolSize: 16
queueCapacity: 5
keepAliveSeconds: 300
然后创建线程池:
package com.tianwc.myblog.pool;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync //开启异步请求
public class ThreadPoolConfig {
@Resource
private Environment env;
//创建线程池
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setThreadNamePrefix("--------------全局线程池-----------------");
pool.setCorePoolSize(Integer.parseInt(env.getProperty("threadpool.corePoolSize")));
pool.setMaxPoolSize(Integer.parseInt(env.getProperty("threadpool.maxPoolSize")));
pool.setKeepAliveSeconds(Integer.parseInt(env.getProperty("threadpool.queueCapacity")));
pool.setQueueCapacity(Integer.parseInt(env.getProperty("threadpool.keepAliveSeconds")));
// 直接在execute方法的调用线程中运行
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
pool.initialize();
return pool;
}
}
我们在项目中使用:
package com.tianwc.myblog.pool;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class AsynchronousTask {
@Async("taskExecutor")
public void recordLoginLog(Long userId){
System.out.println("======== 登录日志记录=====start=======");
try {
// TODO: 2022/4/14 业务处理
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("========登录日志记录------end=======");
}
}
然后在登录的代码中使用:
@Resource
private AsynchronousTask asynchronousTask;
public Boolean login(){
//登录处理
Long userId=10001;
boolean isSucc=true;
if(isSucc){
asynchronousTask.recordLoginLog(userId);
}
System.out.println("=====登录成功====");
}
输出日志:
=====登录成功====
======== 登录日志记录=====start=======
userId=10001
========登录日志记录------end=======
好了,以上就是我们项目中通常使用的方式,另外,注意,在项目中通常是将注解@EnableAsync
放到项目启动类上。
关于线程池的实际使用,建议给大家看看美团的线程池技术方案,感兴趣的自己搜搜。
文中很多线程池相关的知识没有介绍,因为之前有一篇文章已经介绍过了,这里就不赘述了。