Spring Boot Async异步执行任务

异步调用就是不用等待结果的返回就执行后面的逻辑,同步调用则需要等带结果再执行后面的逻辑。

通常我们使用异步操作都会去创建一个线程执行一段逻辑,然后把这个线程丢到线程池中去执行,代码如下:

ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(() -> {
    try {
        // 业务逻辑
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
    }
 });

这样的方式看起来没那么优雅,尽管用了java的lambda。在Spring Boot中有一种更简单的方式来执行异步操作,只需要一个@Async注解即可。

@Async
public void saveLog() {
    System.err.println(Thread.currentThread().getName());
}

我们可以直接在Controller中调用这个业务方法,它就是异步执行的,会在默认的线程池中去执行。需要注意的是一定要在外部的类中去调用这个方法,如果在本类调用是不起作用的,比如this.saveLog()。 最后在启动类上开启异步任务的执行,添加@EnableAsync即可。

另外关于执行异步任务的线程池我们也可以自定义,首先我们定义一个线程池的配置类,用来配置一些参数,具体代码如下:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;  
/**
 * 异步任务线程池配置
 * 
 * @author yinjihuan
 */
@Configuration
@ConfigurationProperties(prefix = "spring.task.pool")
public class TaskThreadPoolConfig { 
    //核心线程数
    private int corePoolSize = 5;  
    //最大线程数
    private int maxPoolSize = 50;  
    //线程池维护线程所允许的空闲时间
    private int keepAliveSeconds = 60;  
    //队列长度
    private int queueCapacity = 10000;
    //线程名称前缀
    private String threadNamePrefix = "FSH-AsyncTask-";
    public String getThreadNamePrefix() {
        return threadNamePrefix;
    }
    public void setThreadNamePrefix(String threadNamePrefix) {
        this.threadNamePrefix = threadNamePrefix;
    }
    public int getCorePoolSize() {
        return corePoolSize;
    }
    public void setCorePoolSize(int corePoolSize) {
        this.corePoolSize = corePoolSize;
    }
    public int getMaxPoolSize() {
        return maxPoolSize;
    }
    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }
    public int getKeepAliveSeconds() {
        return keepAliveSeconds;
    }
    public void setKeepAliveSeconds(int keepAliveSeconds) {
        this.keepAliveSeconds = keepAliveSeconds;
    }
    public int getQueueCapacity() {
        return queueCapacity;
    }
    public void setQueueCapacity(int queueCapacity) {
        this.queueCapacity = queueCapacity;
    }
}  

然后我们重新定义线程池的配置:

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration  
public class AsyncTaskExecutePool implements AsyncConfigurer {    
    private Logger logger = LoggerFactory.getLogger(AsyncTaskExecutePool.class);
   
    @Autowired    
    private TaskThreadPoolConfig config;
    
    @Override  
    public Executor getAsyncExecutor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(config.getCorePoolSize());    
        executor.setMaxPoolSize(config.getMaxPoolSize());    
        executor.setQueueCapacity(config.getQueueCapacity());    
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());    
        executor.setThreadNamePrefix(config.getThreadNamePrefix());
        //线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy
        //AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
        //CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
        //DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
        //DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
        executor.initialize();    
        return executor;    
    }  
    @Override  
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {// 异步任务中异常处理  
        return new AsyncUncaughtExceptionHandler() {  
            @Override  
            public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {  
                logger.error("=========================="+arg0.getMessage()+"=======================", arg0);  
                logger.error("exception method:" + arg1.getName());  
            }  
        };  
    }    
}  

配置完之后我们的异步任务执行的线程池就是我们自定义的了,我们可以通过在属性文件里面配置线程池的大小等等信息,也可以使用默认的配置:

spring.task.pool.maxPoolSize=100

最后讲下线程池配置的拒绝策略,当我们的线程数量高于线程池的处理速度时,任务会被缓存到本地的队列中,队列也是有大小的,如果超过了这个大小,我们需要有拒绝的策略,不然就会内存溢出了,目前支持2种拒绝策略:

  • AbortPolicy: 直接抛出java.util.concurrent.RejectedExecutionException异常
  • CallerRunsPolicy: 主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度

建议大家用CallerRunsPolicy策略,因为当队列中的任务满了之后,如果直接抛异常,那么这个任务就会被丢弃,如果是CallerRunsPolicy策略会用主线程去执行,就是同步执行,最起码这样任务不会丢弃。

推荐相关阅读:

《反射面试题-请了解下》

《注解面试题-请了解下》

原文发布于微信公众号 - 猿天地(cxytiandi)

原文发表时间:2018-05-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spring相关

第3章—高级装配—配置profile bean

我们正常开发的过程中经常遇到的问题是,开发环境是一套环境,qa测试是一套环境,线上部署又是一套环境。这样从开发到测试再到部署,会对程序中的配置修改多次,尤其是从...

1062
来自专栏北京马哥教育

Linux环境下 LVM 逻辑卷的建立、扩容和减容操作

LVM是逻辑盘卷管理(Logical Volume Manager)的简称,它是Linux环境下对磁盘分区进行管理的一种机制,LVM是建立在硬盘和分区之上的一个...

2942
来自专栏编舟记

R3 Corda 和 springboot 集成

因为Corda内置的Corda Webserver已经被标记成弃用了,一般不再提供支持;再者,springboot的生态明显占优。

2532
来自专栏个人分享

Spark Netty与Jetty (源码阅读十一)

  spark呢,对Netty API又做了一层封装,那么Netty是什么呢~是个鬼。它基于NIO的服务端客户端框架,具体不再说了,下面开始。

1764
来自专栏向治洪

React Native网络请求fetch简单封装

在原生应用开发中,为了方便业务开发人员使用,我们一般会对网络库进行一些上传封装,而不是直接使用,例如基于AFNetworking库的ios请求上层封装,Andr...

6147
来自专栏chenssy

【追光者系列】HikariCP源码分析之故障检测那些思考 fail fast & allowPoolSuspension

由于时间原因,本文主要内容参考了 https://segmentfault.com/a/1190000013136251,并结合一些思考做了增注。

1822
来自专栏XAI

SpringMVC+MongoDB+Maven整合(微信回调Oauth授权)

个人小程序。里面是基于百度大脑 腾讯优图做的人脸检测。是关于人工智能的哦。 2017年第一篇自己在工作中的总结文档。土豪可以打赏哦。 https://git.o...

8877
来自专栏编程心路

SSH框架之旅-spring(4)

下面对 SSH 框架做一个整合,所用的三大框架的版本号 Struts2.3.x,Spring4.x,hibernate5.x。

1274
来自专栏jeremy的技术点滴

docker源码分析-Client创建与命令执行

5232
来自专栏技术墨客

Spring核心——数据校验

在Java数据校验详解中详细介绍了Java数据校验相关的功能(简称Bean Validation,涵盖JSR-303、JSR-349、JSR-380),本文将在...

6162

扫码关注云+社区

领取腾讯云代金券