springboot创建及使用多线程的几种方式

在数据处理中,多线程用到的场景很多,在满足计算机CPU处理能力的情况下,使用多线程可以明显提高程序运行效率,缩短大数据处理的能力。作为java程序开发,离不开spring,那么在spring中怎么创建多线程并将注册到spring的类在多线程中使用呢?我自己总结了一下,可以有两种方式,使用线程池和spring自带多线程注解使用。

使用线程池

我一般使用固定线程数量的线程池,假如数据量很大,我会将数据放到一个大集合中,然后按照一定的比例分配数目,同时我自己写了一个分页类,线程的数量可以根据分页类来自动调整。看代码: 分页类

package com.yiche.tag.model;

import java.util.List;

public class DataPage {
    private int page = 1; // 当前页

    public int totalPages = 0; // 总页数

    private int pageRecorders;// 每页5条数据

    private int totalRows = 0; // 总数据数

    private int pageStartRow = 0;// 每页的起始数

    private int pageEndRow = 0; // 每页显示数据的终止数

    private boolean hasNextPage = false; // 是否有下一页

    private boolean hasPreviousPage = false; // 是否有前一页

    private List list;

    // private Iterator it;

    public DataPage(List list, int pageRecorders) {
        init(list, pageRecorders);// 通过对象集,记录总数划分
    }

    /** *//**
     * 初始化list,并告之该list每页的记录数
     * @param list
     * @param pageRecorders
     */
    public void init(List list, int pageRecorders) {
        this.pageRecorders = pageRecorders;
        this.list = list;
        totalRows = list.size();
        // it = list.iterator();
        hasPreviousPage = false;
        if ((totalRows % pageRecorders) == 0) {
            totalPages = totalRows / pageRecorders;
        } else {
            totalPages = totalRows / pageRecorders + 1;
        }

        if (page >= totalPages) {
            hasNextPage = false;
        } else {
            hasNextPage = true;
        }

        if (totalRows < pageRecorders) {
            this.pageStartRow = 0;
            this.pageEndRow = totalRows;
        } else {
            this.pageStartRow = 0;
            this.pageEndRow = pageRecorders;
        }
    }


    // 判断要不要分页
    public boolean isNext() {
        return list.size() > 5;
    }

    public void setHasPreviousPage(boolean hasPreviousPage) {
        this.hasPreviousPage = hasPreviousPage;
    }

    public String toString(int temp) {
        String str = Integer.toString(temp);
        return str;
    }

    public void description() {

        String description = "共有数据数:" + this.getTotalRows() +

                "共有页数: " + this.getTotalPages() +

                "当前页数为:" + this.getPage() +

                " 是否有前一页: " + this.isHasPreviousPage() +

                " 是否有下一页:" + this.isHasNextPage() +

                " 开始行数:" + this.getPageStartRow() +

                " 终止行数:" + this.getPageEndRow();

        System.out.println(description);
    }

    public List getNextPage() {
        page = page + 1;

        disposePage();

        System.out.println("用户凋用的是第" + page + "页");
        this.description();
        return getObjects(page);
    }

    /** *//**
     * 处理分页
     */
    private void disposePage() {

        if (page == 0) {
            page = 1;
        }

        if ((page - 1) > 0) {
            hasPreviousPage = true;
        } else {
            hasPreviousPage = false;
        }

        if (page >= totalPages) {
            hasNextPage = false;
        } else {
            hasNextPage = true;
        }
    }

    public List getPreviousPage() {

        page = page - 1;

        if ((page - 1) > 0) {
            hasPreviousPage = true;
        } else {
            hasPreviousPage = false;
        }
        if (page >= totalPages) {
            hasNextPage = false;
        } else {
            hasNextPage = true;
        }
        this.description();
        return getObjects(page);
    }

    /** *//**
     * 获取第几页的内容
     *
     * @param page 从1开始
     * @return
     */
    public List getObjects(int page) {
        if (page == 0)
            this.setPage(1);
        else
            this.setPage(page);
        this.disposePage();
        if (page * pageRecorders < totalRows) {// 判断是否为最后一页
            pageEndRow = page * pageRecorders;
            pageStartRow = pageEndRow - pageRecorders;
        } else {
            pageEndRow = totalRows;
            pageStartRow = pageRecorders * (totalPages - 1);
        }

        List objects = null;
        if (!list.isEmpty()) {
            objects = list.subList(pageStartRow, pageEndRow);
        }
        //this.description();
        return objects;
    }

    public List getFistPage() {
        if (this.isNext()) {
            return list.subList(0, pageRecorders);
        } else {
            return list;
        }
    }

    public boolean isHasNextPage() {
        return hasNextPage;
    }


    public void setHasNextPage(boolean hasNextPage) {
        this.hasNextPage = hasNextPage;
    }


    public List getList() {
        return list;
    }


    public void setList(List list) {
        this.list = list;
    }


    public int getPage() {
        return page;
    }


    public void setPage(int page) {
        this.page = page;
    }


    public int getPageEndRow() {
        return pageEndRow;
    }


    public void setPageEndRow(int pageEndRow) {
        this.pageEndRow = pageEndRow;
    }


    public int getPageRecorders() {
        return pageRecorders;
    }


    public void setPageRecorders(int pageRecorders) {
        this.pageRecorders = pageRecorders;
    }


    public int getPageStartRow() {
        return pageStartRow;
    }


    public void setPageStartRow(int pageStartRow) {
        this.pageStartRow = pageStartRow;
    }


    public int getTotalPages() {
        return totalPages;
    }


    public void setTotalPages(int totalPages) {
        this.totalPages = totalPages;
    }


    public int getTotalRows() {
        return totalRows;
    }


    public void setTotalRows(int totalRows) {
        this.totalRows = totalRows;
    }


    public boolean isHasPreviousPage() {
        return hasPreviousPage;
    }

}

这个分页类可以多个项目复用,既可以实现分页,还可以用到线程池中给线程分配数量使用。以下代码为如何使用该分页类给线程池使用的。

List<Integer> carModelIds = carsTagService.getAllCarModelIds();
        DataPage dataPage = new DataPage(carModelIds, 300);
        ExecutorService executorService = Executors.newFixedThreadPool(dataPage.totalPages);
        for (int i = 0; i < dataPage.totalPages; i++) {
            final int index = i + 1;
            executorService.execute(new Runnable() {
                List<Integer> temp = dataPage.getObjects(index);
                @Override
                public void run() {
                    try {
                        if (temp != null && temp.size() > 0) {
                            temp=Collections.synchronizedList(temp);
                            int sum = 0;
                            for (Integer carId : temp) {
                                try {
                                    carsTagService.insertCarTagByCarId(carId);
                                    sum++;
                                    if (sum % 100 == 0) {
                                        System.out.println("线程" + index + "当前执行了" + sum + "条,当前进度是" + sum * 100/ temp.size() + "%");
                                    }
                                } catch (Exception ex) {
                                    ex.printStackTrace();
                                }
                            }
                            System.out.println("线程" + index + "执行CarTagInitTask任务成功:" + temp.size());
                        } else {
                            System.out.println("CarTagInitTask任务执行完成:未读取到数据!");
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executorService.shutdown();
        while (true) {
            if (executorService.isTerminated()) {
                System.out.println("所有的子线程都结束了!");
                break;
            }
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

上述代码中carModelIds就是要处理的数据集合,数目很大,如果直接for循环将会消耗很多的时间,利用线程池可以解决这个问题。但是如果直接创建多线程,线程中使用的对象需要final修饰,这对于spring管理的类不适用。使用线程池可以解决这个问题。

使用springboot自带@Async注解创建异步线程

在springboot中,可以使用@Async注解来将一个方法设置为异步方法,调用该方法的时候,是新开一个线程去调用。代码如下:

@Component
public class Task {
    @Async
    public void doTaskOne() throws Exception {
       System.out.println("开始做任务一");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
    }
    @Async
    public void doTaskTwo() throws Exception {
        System.out.println("开始做任务二");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");
    }
    @Async
    public void doTaskThree() throws Exception {
       System.out.println("开始做任务三");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
    }
}

为了让@Async注解能够生效,还需要在Spring Boot的主程序中配置@EnableAsync,如下所示:

@SpringBootApplication
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

此时可以反复执行单元测试,您可能会遇到各种不同的结果,比如:

  • 没有任何任务相关的输出
  • 有部分任务相关的输出
  • 乱序的任务相关的输出

原因是目前doTaskOne、doTaskTwo、doTaskThree三个函数的时候已经是异步执行了。主程序在异步调用之后,主程序并不会理会这三个函数是否执行完成了,由于没有其他需要执行的内容,所以程序就自动结束了,导致了不完整或是没有输出任务相关内容的情况。

注: @Async所修饰的函数不要定义为static类型,这样异步调用不会生效

异步回调

为了让doTaskOne、doTaskTwo、doTaskThree能正常结束,假设我们需要统计一下三个任务并发执行共耗时多少,这就需要等到上述三个函数都完成调动之后记录时间,并计算结果。

那么我们如何判断上述三个异步调用是否已经执行完成呢?我们需要使用Future<T>来返回异步调用的结果,就像如下方式改造doTaskOne函数:

@Async
public Future<String> doTaskOne() throws Exception {
    System.out.println("开始做任务一");
    long start = System.currentTimeMillis();
    Thread.sleep(random.nextInt(10000));
    long end = System.currentTimeMillis();
    System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
    return new AsyncResult<>("任务一完成");
}

按照如上方式改造一下其他两个异步函数之后,下面我们改造一下测试用例,让测试在等待完成三个异步调用之后来做一些其他事情

@Test
public void test() throws Exception {
    long start = System.currentTimeMillis();
    Future<String> task1 = task.doTaskOne();
    Future<String> task2 = task.doTaskTwo();
    Future<String> task3 = task.doTaskThree();
    while(true) {
        if(task1.isDone() && task2.isDone() && task3.isDone()) {
            // 三个任务都调用完成,退出循环等待
            break;
        }
        Thread.sleep(1000);
    }
    long end = System.currentTimeMillis();
    System.out.println("任务全部完成,总耗时:" + (end - start) + "毫秒");
}

看看我们做了哪些改变:

在测试用例一开始记录开始时间 在调用三个异步函数的时候,返回Future<String>类型的结果对象 在调用完三个异步函数之后,开启一个循环,根据返回的Future<String>对象来判断三个异步函数是否都结束了。若都结束,就结束循环;若没有都结束,就等1秒后再判断。 跳出循环之后,根据结束时间 - 开始时间,计算出三个任务并发执行的总耗时。 执行一下上述的单元测试,可以看到如下结果:

开始做任务一 开始做任务二 开始做任务三 完成任务三,耗时:37毫秒 完成任务二,耗时:3661毫秒 完成任务一,耗时:7149毫秒 任务全部完成,总耗时:8025毫秒

需要注意,spring管理的异步线程数量有限,如果是web项目的话,线程数量由tomcat的线程池配置有关系,所以如果可以,最好自己配置线程配置类。

/**
 * springboot里面创建异步线程配置类
 * @author kouyy
 */
@Configuration
@EnableAsync
public class ThreadAsyncConfigurer implements AsyncConfigurer {

    @Bean
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //设置核心线程数
        threadPool.setCorePoolSize(10);
        //设置最大线程数
        threadPool.setMaxPoolSize(100);
        //线程池所使用的缓冲队列
        threadPool.setQueueCapacity(10);
        //等待任务在关机时完成--表明等待所有线程执行完
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        // 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
        threadPool.setAwaitTerminationSeconds(60);
        //  线程名称前缀
        threadPool.setThreadNamePrefix("MyAsync-");
        // 初始化线程
        threadPool.initialize();
        return threadPool;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

配置类创建之后,以后再使用@Async创建异步线程就可以按照自己配置来使用了。不用担心和spring线程池数量冲突了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Albert陈凯

2018-04-06 JDK 新特性总览

JDK5新特性 自动装箱与拆箱 枚举 静态导入 可变参数(Varargs) 内省(introspector) 泛型(Generic) For-Eac...

3214
来自专栏菩提树下的杨过

利用sharding-jdbc分库分表

sharding-jdbc是当当开源的一款分库分表的数据访问层框架,能对mysql很方便的分库、分表,基本不用修改原有代码,只要配置一下即可,完整的配置参考以下...

4837
来自专栏木木玲

Netty 源码解析 ——— ChannelConfig 和 Attribute

2082
来自专栏阮一峰的网络日志

Generator 函数的含义与用法

本文是《深入掌握 ECMAScript 6 异步编程》系列文章的第一篇。 Generator函数的含义与用法 Thunk函数的含义与用法 co函数库的含义与...

3636
来自专栏JavaEdge

JedisSentinelPool源码分析

1. 概述 Redis-Sentinel作为官方推荐的HA解决方案,Jedis也在客户端角度实现了对Sentinel的支持,主要实现在JedisSentinel...

3484
来自专栏Java 源码分析

JavaWeb基础

1. XML xml一般就用来存放少量的数据,或者是作为配置文件。 xml的声明<?xml version=”1.0” encoding=”utf-8”?> ...

3045
来自专栏一名合格java开发的自我修养

spring常用注解使用解析

spring没有采用约定优于配置的策略,spring要求显示指定搜索哪些路径下的Java文件。spring将会把合适的java类全部注册成spring Bean...

741
来自专栏Albert陈凯

Hadoop数据分析平台实战——140Hive函数以及自定义函数讲解离线数据分析平台实战——140Hive函数以及自定义函数讲解

离线数据分析平台实战——140Hive函数以及自定义函数讲解 Hive函数介绍 HQL内嵌函数只有195个函数(包括操作符,使用命令show functions...

2888
来自专栏大内老A

ASP.NET Core中的依赖注入(2):依赖注入(DI)

IoC主要体现了这样一种设计思想:通过将一组通用流程的控制从应用转移到框架之中以实现对流程的复用,同时采用“好莱坞原则”是应用程序以被动的方式实现对流程的定制。...

2178
来自专栏老马说编程

(72) 显式条件 / 计算机程序的思维逻辑

上节我们介绍了显式锁,本节介绍关联的显式条件,介绍其用法和原理。显式条件也可以被称做条件变量、条件队列、或条件,后文我们可能会交替使用。 用法 基本概念和方法...

1846

扫码关注云+社区