前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >在Spring项目中以多线程的方式并发执行,异步处理任务。解决统计、累加类业务的例子。

在Spring项目中以多线程的方式并发执行,异步处理任务。解决统计、累加类业务的例子。

原创
作者头像
AlbertZhang
修改2020-10-14 17:41:08
2.6K0
修改2020-10-14 17:41:08
举报
文章被收录于专栏:技术文献技术文献

业务描述:

其实具体业务无所谓,这次解决的问题是“统计、累加类业务类型”,这里的业务就用”统计动物园中所有种类动物数量的总和”,类比代替了。

我要写一个接口,吐出 “动物园所有种类动物的总和”。已知目前有 15种动物,现在有现成的查询每种动物数量的接口,每种动物都要调用RPC接口去别的系统查询。且耗时较高。

工作方案:

根据上面的描述,线性去查询,调用15次RPC接口,时间花费巨大,所以放弃单线程模式。打算使用多线程的方法,进来请求后,分发 15个线程去查每一种动物的数据,返回结果。用多线程的话,在项目中肯定首先考虑使用线程池。

具体实现 (线程池 + 线程 + CountDownLatch ):

1、配置线程池
代码语言:javascript
复制
    <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <!-- 核心线程数  -->
    <property name="corePoolSize" value="15" />
    <!-- 最大线程数 -->
    <property name="maxPoolSize" value="30" />
    <!-- 队列最大长度 >=mainExecutor.maxSize -->
    <property name="queueCapacity" value="30" />
    <!-- 线程池维护线程所允许的空闲时间 默认为60s-->
    <property name="keepAliveSeconds" value="180" />
    <!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
    <property name="rejectedExecutionHandler">
        <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
        <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
    </property>
    </bean>

ThreadPoolTaskExecutor 是Spring 对JUC包内的ThreadPoolExecutor上的封装,能配置Bean,注入SpringIOC 容器中,交给Spring管理

或者springBoot:

代码语言:javascript
复制
@Configuration
@EnableAsync
public class AsyncConfig {

    public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor";

    @Bean(name = ASYNC_EXECUTOR_NAME)
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setTaskDecorator(new CopyingDecorator());
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(1000);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setThreadNamePrefix("AsyncThread-");
        executor.initialize();
        return executor;
    }

2、Service 实现
代码语言:javascript
复制
    /**
     * 查询数量使用的线程池
     */
    @Autowired
    @Qualifier("threadPool")
    private ThreadPoolTaskExecutor threadPool;


  public long getAllAnimalCount(int accountType, String account) {
       try {
            // 初始化返回结果
            AtomicLong resultValue = new AtomicLong(0);
            // 获取所有的动物类型
            AllTypeEnum[] enumValues = AllTypeEnum.values();
            // 开启倒计时协调器
            CountDownLatch countDownLatch = new CountDownLatch(enumValues .length);
            // 用线程池分发线程分配处理每一个类型
            for (AllTypeEnum tempEnum : enumValues ) {
                threadPool.execute(new AnimalCountThread(account, accountType, tempEnum.getType(), resultValue, countDownLatch));
            }
            // 等所有线程都处理完之后再拿返回结果
            countDownLatch.await();
            return resultValue.get();
        } catch (InterruptedException e1) { 
            log.error("出现线程中断异常", e1);
        } catch (Exception e2) { 
            log.error("出现未知异常", e2);
        }
        return 0;
    }
3、线程的定义
代码语言:javascript
复制
/**
 * 查询动物数量线程
 *
 * @author XXX
 * @date 2020/05/14
 */
@Data
@Slf4j
public class AnimalCountThread implements Runnable {

    /**
     * 账号
     */
    private String account;

    /**
     * 账户类型
     */
    private int accountType;

    /**
     * 动物类型 来自枚举
     */
    private int type;

    /**
     * 累加的目标值
     */
    private AtomicLong targetValue;

    /**
     * 栅栏
     */
    private CountDownLatch countDownLatch;


    /**
     * 构造函数
     *
     * @param account 账号
     * @param accountType 账号类型
     * @param type 动物类型
     * @param targetValue 累加的目标值
     * @param countDownLatch 栅栏
     */
    public AnimalCountThread (String account, int accountType, int type, AtomicLong targetValue, CountDownLatch countDownLatch) {
        this.account = account;
        this.accountType = accountType;
        this.type = type;
        this.targetValue = targetValue;
        this.countDownLatch = countDownLatch;
    }


    @Override
    public void run() {
        try {
            AllTypeEnum typeEnum = AllTypeEnum.getEnumByType(getType());
            if (typeEnum != null) {
                //获取具体业务Bean
                CommonAnimalService commonAnimalService = SpringContext.getBean("commonAnimalServiceImpl");
                long num = commonAnimalService.countAnimalNum(getAccount(), getWarningType());  
                targetValue.getAndAdd(num);         
            }
        }catch (Exception e) {
            log.error("线程执行出现异常",e);
        } finally {
            countDownLatch.countDown();
        }

    }

}

总结 : 

1、线程中是无法直接使用注解注入JavaBean的,所以我从Spring容器里拿的。或者也可以不定义这个线程,使用匿名内部类的方法。

2、累计的目标值,直接使用 AtomicLong  省得自己去同步。

3、用CountDownLatch 等所有线程都处理完,主线程再拿返回结果。

4、CountDownLatch 在子线程中,一定要保证被调用到 countDown()。

5、线程池配置拒绝策略,另外三种都丢弃了任务,所以用交给主线程的这种方法比较适合当前业务。

6、线程池的配置队列长度:要是追求性能的话不能过长。越长耗时越长,接口性能越差。

7、接口最外层要合理使用缓存,缓解压力,在对外RPC接口出还可以配置限流。 由于运用了多线程,快进快出, 限流是为了减小峰值。快进快出的话即使限流。 吞吐量也会比不用“多线程”大。

8、一定要压测一下,对于线程池的配置,也可以根据压测结果,调配。

5月22日补充:

上面的实现方式,由于线程实例是实现Runable接口的方式,Runable run() 方法没有返回值的原因,所以用了公共的参数,AtomicLong  在线程内部累计计算的结果。而且用了CountDownLatch 进行同步操作,来保证主线程获取结果时,所有子任务处理完毕。

如果我们用其他方式时可以不用这两步。

先说线程池 +Callable + Future的方式。

一、Callable接口是jdk 1.4 以后提供的,能返回值,并且能抛异常。 public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } Callable一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本: <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); 因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。 二、Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。 public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

线程池 +Callable + Future的方式

1、线程池的配置

同上

2、Service 实现
代码语言:javascript
复制
      /**
     * 查询数量使用的线程池
     */
    @Autowired
    @Qualifier("threadPool")
    private ThreadPoolTaskExecutor threadPool;

    @Override
    public long getAllAnimalCount(int accountType, String account) {
        try {
            // 初始化返回结果
            Long resultValue = 0L;
            // 获取所有的预警类型
            AllTypeEnum[] enumValues = AllTypeEnum.values();
            // 初始化,Future结果容器
            List<Future<Long>> futureList = new ArrayList<>();
            // 分发任务
            for (AllTypeEnum tempEnum : enumValues) {
                Future<Long> tempResult = threadPool.submit(new AnimalCountTask (account, accountType, tempEnum.getType()));
                futureList.add(tempResult);
            }
            // 获取所有结果
            for (Future<Long> tempFuture : futureList) {
                try {
                    if (tempFuture.get() != null) {
                        resultValue += tempFuture.get(); // 会阻塞,直到这个任务执行完毕。
                    }
                } catch (Exception e) {
                    log.error("getAllAnimalCount,线程执行出现异常", e);
                }
            }
            return resultValue;
        } catch (Exception e) {
            log.error("[getAllAnimalCount]出现异常", e);
        }
        return 0;
    }
3、Task 任务
代码语言:javascript
复制
/**
 * 查询动物数量任务 Callable版本
 *
 * @author XXX
 * @date 2020/05/14
 */
@Data
@Slf4j
public class AnimalCountTask implements Callable<Long> {

    /**
     * 账号
     */
    private String account;

    /**
     * 账户类型
     */
    private int accountType;

    /**
     * 动物类型
     */
    private int animalType;

    public AnimalCountTask(String account, int accountType, int animalType) {
        this.account = account;
        this.accountType = accountType;
        this.animalType= animalType;
    }


    @Override
    public Long call() throws Exception {
         AllTypeEnum typeEnum = AllTypeEnum.getEnumByType(getType());
            if (typeEnum != null) {
                //获取具体业务Bean
                CommonAnimalService commonAnimalService = SpringContext.getBean("commonAnimalServiceImpl");
                long num = commonAnimalService.countAnimalNum(getAccount(), getWarningType());  
                return num;       
            }
        }
       return null;
    }
}

这种方式的实现,可以看到 获取结果  resultValue += tempFuture.get(); 时会阻塞。循环获取的时候,假如你第二个任务用时最长,那他在for循环的第二次时候,等半天才接着处理其他的。 

这个问题呢,可以优化。我想哪个子任务先做完,我就先获取那个子任务的结果,而不是傻傻的线性的一个任务一个任务的看。

JDK 8 提供了 CompletionService   具有这样的功能。它的实现类内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。

CompletionService是Java8的新增接口,JDK为其提供了一个实现类ExecutorCompletionService。这个类是为线程池中Task的执行结果服务的,即为Executor中Task返回Future而服务的。 CompletionService的实现目标是“任务先完成可优先获取到,按完成先后顺序排序” public interface CompletionService<V> { // 提交 Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); // 获取 Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }

  • Future<V> submit(Callable<V> task):提交一个Callable类型任务,并返回该任务执行结果关联的Future;
  • Future<V> submit(Runnable task,V result):提交一个Runnable类型任务,并返回该任务执行结果关联的Future;
  • Future<V> take():从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;
  • Future<V> poll():从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null,不阻塞;
  • Future<V> poll(long timeout, TimeUnit unit):从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null;

线程池 +Callable + ExecutorCompletionService 的方式:

1、线程池的配置

同上

2、Service实现
代码语言:javascript
复制
  /**
     * 查询数量使用的线程池
     */
    @Autowired
    @Qualifier("threadPool")
    private ThreadPoolTaskExecutor threadPool;

    @Override
    public long getAllAnimalCount(int accountType, String account) {
        try {
            // 初始化返回结果
            Long resultValue = 0L;
            // 获取所有的动物类型
            AllTypeEnum[] enumValues = AllTypeEnum.values();
             // 实例化 CompletionService
            CompletionService<Long> completionService = new ExecutorCompletionService<>(threadPool);
            // 用CompletionService提交分发任务
            for (AllTypeEnum tempEnum : enumValues) {
                completionService.submit(new AnimalCountTask(account, accountType, tempEnum.getType()));
            }
            // 拿取返回值并计算总和
            for (AllTypeEnum tempEnum : enumValues ) {
                try {
                    Long value = completionService.take().get();// 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;
                    if (value != null) {
                        resultValue += value;
                    }
                } catch (Exception e) {
                    log.error("[getAllAnimalCount]线程执行出现异常", e);
                }
            }
            return resultValue;
        } catch (Exception e) {
            log.error("[getAllAnimalCount]出现异常", e);
        }
        return 0;
    }
3、Task 任务

同上Callable的实现

代码语言:javascript
复制
/**
 * 查询动物数量任务 Callable版本
 *
 * @author XXX
 * @date 2020/05/14
 */
@Data
@Slf4j
public class AnimalCountTask implements Callable<Long> {

    /**
     * 账号
     */
    private String account;

    /**
     * 账户类型
     */
    private int accountType;

    /**
     * 动物类型
     */
    private int animalType;

    public AnimalCountTask(String account, int accountType, int animalType) {
        this.account = account;
        this.accountType = accountType;
        this.animalType= animalType;
    }


    @Override
    public Long call() throws Exception {
         AllTypeEnum typeEnum = AllTypeEnum.getEnumByType(getType());
            if (typeEnum != null) {
                //获取具体业务Bean
                CommonAnimalService commonAnimalService = SpringContext.getBean("commonAnimalServiceImpl");
                long num = commonAnimalService.countAnimalNum(getAccount(), getWarningType());  
                return num;       
            }
        }
       return null;
    }
}

说一下,Future.get()取结果时,为什么try catch异常。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 业务描述:
  • 工作方案:
  • 具体实现 (线程池 + 线程 + CountDownLatch ):
    • 1、配置线程池
      • 2、Service 实现
        • 3、线程的定义
        • 总结 : 
        • 5月22日补充:
        • 线程池 +Callable + Future的方式
          • 1、线程池的配置
            • 2、Service 实现
              • 3、Task 任务
              • 线程池 +Callable + ExecutorCompletionService 的方式:
                • 1、线程池的配置
                  • 2、Service实现
                    • 3、Task 任务
                    相关产品与服务
                    容器服务
                    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档