前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot之线程池ThreadPoolTaskExecutor以及@Async异步注解

springboot之线程池ThreadPoolTaskExecutor以及@Async异步注解

作者头像
海加尔金鹰
发布2020-06-08 14:35:39
28.1K0
发布2020-06-08 14:35:39
举报

前言

最近项目当中有需求,要进行异步的处理,需要使用到线程池,很久没有使用到线程池了,一来是做JAVAweb开发基本上很少用到异步处理,二来是发现有的老项目里面,线程和线程池的使用比较混乱,有好几个线程池,有的线程池是通过spring管理的,有的是自己创建的,然后有的地方是直接创建的线程。所以这里记录下自己在项目当中如何优雅的使用线程池!避免项目当中到处都是线程池!!!

SpringBoot整合ThreadPoolTaskExecutor线程池

ThreadPoolExecutor:这个是JAVA自己实现的线程池执行类,基本上创建线程池都是通过这个类进行的创建! ThreadPoolTaskExecutor :这个是springboot基于ThreadPoolExecutor实现的一个线程池执行类。

In the absence of an Executor bean in the context, Spring Boot auto-configures a ThreadPoolTaskExecutor with sensible defaults that can be automatically associated to asynchronous task execution (@EnableAsync) and Spring MVC asynchronous request processing.

在springboot当中,根据 官方文档的说明,如果没有配置线程池的话,springboot会自动配置一个ThreadPoolTaskExecutor 线程池到bean当中,我们只需要按照他的方式调用就可以了!!!

使用springboot默认的线程池

既然springboot有默认的线程池,说明我们可以很简单的进行调用

方式一:通过@Async注解调用

第一步:在Application启动类上面加上@EnableAsync

代码语言:javascript
复制
@SpringBootApplication
@EnableAsync
public class ThreadpoolApplication {
    public static void main(String[] args) {
        SpringApplication.run(ThreadpoolApplication.class, args);
    }
}

第二步:在需要异步执行的方法上加上@Async注解

代码语言:javascript
复制
@Service
public class AsyncTest {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Async
    public void hello(String name){
    	//这里使用logger 方便查看执行的线程是什么
        logger.info("异步线程启动 started."+name);  
    }
}

第三步:测试类进行测试验证

代码语言:javascript
复制
    @Autowired
    AsyncTest asyncTest;
    @Test
    void contextLoads() throws InterruptedException {
        asyncTest.hello("afsasfasf");
        //一定要休眠 不然主线程关闭了,子线程还没有启动
        Thread.sleep(1000);
    }

查看打印的日志:

INFO 2276 --- [ main] c.h.s.t.t.ThreadpoolApplicationTests : Started ThreadpoolApplicationTests in 3.003 seconds (JVM running for 5.342) INFO 2276 --- [ task-1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.afsasfasf

可以清楚的看到新开了一个task-1的线程执行任务。验证成功!!!

方式二:直接调用ThreadPoolTaskExecutor

修改上面测试类,直接注入ThreadPoolTaskExecutor

代码语言:javascript
复制
@SpringBootTest
class ThreadPoolApplicationTests {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    AsyncTest asyncTest;
    @Autowired
    ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @Test
    void contextLoads() throws InterruptedException {
        asyncTest.hello("async注解创建");
        threadPoolTaskExecutor.submit(new Thread(()->{
            logger.info("threadPoolTaskExecutor 创建线程");
        }));
        //一定要休眠 不然主线程关闭了,子线程还没有启动
        Thread.sleep(1000);
    }
}

查看打印的日志发现都成功创建线程!!!:

INFO 12360 --- [ task-2] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程 INFO 12360 --- [ task-1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建

备注1:如果只使用ThreadPoolTaskExecutor, 是可以不用在Application启动类上面加上@EnableAsync注解的哦!!! 备注2:多次测试发现ThreadPoolTaskExecutor执行比@Async要快!!!

线程池默认配置信息

以下是springboot默认的线程池配置,可以在application.properties文件当中进行相关的设置!!!

代码语言:javascript
复制
# 核心线程数
spring.task.execution.pool.core-size=8  
# 最大线程数
spring.task.execution.pool.max-size=16
# 空闲线程存活时间
spring.task.execution.pool.keep-alive=60s
# 是否允许核心线程超时
spring.task.execution.pool.allow-core-thread-timeout=true
# 线程队列数量
spring.task.execution.pool.queue-capacity=100
# 线程关闭等待
spring.task.execution.shutdown.await-termination=false
spring.task.execution.shutdown.await-termination-period=
# 线程名称前缀
spring.task.execution.thread-name-prefix=task-

深入springboot默认的线程池

根据官方文档的说明,Spring Boot auto-configures a ThreadPoolTaskExecutor 。最终找到springboot的线程池自动装配类:TaskExecutionAutoConfiguration

代码语言:javascript
复制
    @Bean
    @ConditionalOnMissingBean
    public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) {
        Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
        Stream var10001 = taskExecutorCustomizers.orderedStream();
        var10001.getClass();
        builder = builder.customizers(var10001::iterator);
        builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique());
        return builder;
    }

同时在ThreadPoolTaskExecutor源码当中可以看到线程池的初始化方式是直接调用的ThreadPoolExecutor进行的初始化。

代码语言:javascript
复制
 protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
        BlockingQueue<Runnable> queue = this.createQueue(this.queueCapacity);
        ThreadPoolExecutor executor;
        if (this.taskDecorator != null) {
            executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
                public void execute(Runnable command) {
                    Runnable decorated = ThreadPoolTaskExecutor.this.taskDecorator.decorate(command);
                    if (decorated != command) {
                        ThreadPoolTaskExecutor.this.decoratedTaskMap.put(decorated, command);
                    }

                    super.execute(decorated);
                }
            };
        } else {
            executor = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, (long)this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
        }

        if (this.allowCoreThreadTimeOut) {
            executor.allowCoreThreadTimeOut(true);
        }

        this.threadPoolExecutor = executor;
        return executor;
    }

同时会发现默认的线程池拒绝策略是: AbortPolicy 直接抛出异常!!!

private RejectedExecutionHandler rejectedExecutionHandler = new AbortPolicy();

使用自定义的线程池

在默认配置信息里面是没有线程池的拒绝策略设置的方法的,如果需要更换拒绝策略就需要自定义线程池,并且如果项目当中需要多个自定义的线程池,又要如何进行管理呢?

自定义Configuration

第一步:创建一个ThreadPoolConfig 先只配置一个线程池,并设置拒绝策略为CallerRunsPolicy

代码语言:javascript
复制
@Configuration
public class ThreadPoolConfig {

    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }
}

然后执行之前写的测试代码发现,使用的线程池已经变成自定义的线程池了。

INFO 12740 --- [ myExecutor--2] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程 INFO 12740 --- [ myExecutor--1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建

第二步:如果配置有多个线程池,该如何指定线程池呢?

代码语言:javascript
复制
@Configuration
public class ThreadPoolConfig {

       @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean("poolExecutor")
    public Executor poolExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor2--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean("taskPoolExecutor")
    public Executor taskPoolExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor3--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }
}

执行测试类,直接报错说找到多个类,不知道加载哪个类:

No qualifying bean of type 'org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor' available: expected single matching bean but found 3: taskExecutor,taskPoolExecutor

由于测试类当中是这样自动注入的:

代码语言:javascript
复制
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor; 

考虑到@Autowired 以及@Resource两个注入时的存在多个类如何匹配问题,然后发现只要我们在注入时指定具体的bean就会调用对应的线程池!!!

即修改测试类如下:

代码语言:javascript
复制
    @Autowired
    AsyncTest asyncTest;
    @Autowired
    ThreadPoolTaskExecutor poolExecutor; //会去匹配 @Bean("poolExecutor") 这个线程池
    @Test
    void contextLoads() throws InterruptedException {
        asyncTest.hello("async注解创建");
        //一定要休眠 不然主线程关闭了,子线程还没有启动
        poolExecutor.submit(new Thread(()->{
            logger.info("threadPoolTaskExecutor 创建线程");
        }));
        Thread.sleep(1000);
    }

最后得到如下信息:

INFO 13636 --- [ myExecutor2--1] c.h.s.t.t.ThreadpoolApplicationTests : threadPoolTaskExecutor 创建线程 INFO 13636 --- [ myExecutor--1] c.h.s.threadpool.threadpool.AsyncTest : 异步线程启动 started.async注解创建

备注1:如果是使用的@Async注解,只需要在注解里面指定bean的名称就可以切换到对应的线程池去了。如下所示:

代码语言:javascript
复制
	@Async("taskPoolExecutor")
    public void hello(String name){
        logger.info("异步线程启动 started."+name);
    }

备注2:如果有多个线程池,但是在@Async注解里面没有指定的话,会默认加载第一个配置的线程池

总结

线程池的四种拒绝策略:https://www.cnblogs.com/cblogs/p/9444557.html JAVA常用的四种线程池: https://www.cnblogs.com/zhujiabin/p/5404771.html 线程池的使用是为了管理线程,但是对于线程池项目当中也是要管理起来的。有利于后续的维护!!! 作者:海加尔金鹰

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-05-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • SpringBoot整合ThreadPoolTaskExecutor线程池
    • 使用springboot默认的线程池
      • 方式一:通过@Async注解调用
      • 方式二:直接调用ThreadPoolTaskExecutor
      • 线程池默认配置信息
      • 深入springboot默认的线程池
    • 使用自定义的线程池
      • 自定义Configuration
    • 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档