专栏首页服务端技术杂谈采用线程池进行异步任务处理

采用线程池进行异步任务处理

创建线程池

阿里JAVA编码规约,建议采用ThreadPoolExecutor创建线程池。

    private static ExecutorService simpleExecutorService = new ThreadPoolExecutor(            200,            300,            0L,
            TimeUnit.MICROSECONDS,            new LinkedBlockingDeque<Runnable>(10000),            new ThreadPoolExecutor.DiscardPolicy());

在同步操作过程中通过线程池完成异步操作

public void doSomething(final String message) {        simpleExecutorService.execute(new Runnable() {            @Override
            public void run() {                try {
                    Thread.sleep(3000);
                    System.out.println("step 2");
                    System.out.println("message=>" + message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });        System.out.println("step 1");
    }

进行测试

ThreadUtil threadUtil = new ThreadUtil();
threadUtil.doSomething("a thread pool demo");

输出结果

step 1
step 2message=>a thread pool demo

@Async

在Spring3.x之后框架已经支持采用@Async注解进行异步执行了。

被@Async修饰的方法叫做异步方法,这些异步方法会在新的线程中进行处理,不影响主线程的顺序执行。

无返回值执行

@Component@Slf4jpublic class AsyncTask {    @Async
    public void dealNoReturnTask(){        log.info("Thread {} deal No Return Task start", Thread.currentThread().getName());        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis());
    }
}

进行调用:

@SpringBootTest(classes = SpringbootApplication.class)@RunWith(SpringJUnit4ClassRunner.class)@Slf4jpublic class AsyncTest {    @Autowired
    private AsyncTask asyncTask;    @Test
    public void testDealNoReturnTask(){
        asyncTask.dealNoReturnTask();        try {
            log.info("begin to deal other Task!");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

有返回值执行

    @Async
    public Future<String> dealHaveReturnTask() {        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("thread", Thread.currentThread().getName());
        jsonObject.put("time", System.currentTimeMillis());        return new AsyncResult<String>(jsonObject.toJSONString());
    }

判断任务是否取消:

    @Test    public void testDealHaveReturnTask() throws Exception {

        Future<String> future = asyncTask.dealHaveReturnTask();        log.info("begin to deal other Task!");        while (true) {            if(future.isCancelled()){                log.info("deal async task is Cancelled");                break;
            }            if (future.isDone() ) {                log.info("deal async task is Done");                log.info("return result is " + future.get());                break;
            }            log.info("wait async task to end ...");
            Thread.sleep(1000);
        }
    }

异步执行结果异常处理

我们可以实现AsyncConfigurer接口,也可以继承AsyncConfigurerSupport类来实现 在方法getAsyncExecutor()中创建线程池的时候,必须使用 executor.initialize(), 不然在调用时会报线程池未初始化的异常。 如果使用threadPoolTaskExecutor()来定义bean,则不需要初始化

@Configuration@EnableAsync@Slf4jpublic class AsyncConfig implements AsyncConfigurer {//    @Bean//    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){//        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//        executor.setCorePoolSize(10);//        executor.setMaxPoolSize(100);//        executor.setQueueCapacity(100);//        return executor;//    }

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("AsyncExecutorThread-");
        executor.initialize(); //如果不初始化,导致找到不到执行器
        return executor;
    }    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }
}

异步异常处理类:

@Slf4jpublic class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params));        if (ex instanceof AsyncException) {
            AsyncException asyncException = (AsyncException) ex;
            log.info("asyncException:{}",asyncException.getErrorMessage());
        }

        log.info("Exception :");
        ex.printStackTrace();
    }
}@Data@AllArgsConstructorpublic class AsyncException extends Exception {    private int code;    private String errorMessage;
}
  • 在无返回值的异步调用中,异步处理抛出异常,AsyncExceptionHandler的handleUncaughtException()会捕获指定异常,原有任务还会继续运行,直到结束。
  • 在有返回值的异步调用中,异步处理抛出异常,会直接抛出异常,异步任务结束,原有处理结束执行。

Future或FutureTask

需要结合Callable

public class CallableDemo implements Callable<Integer> {    private int sum;    @Override
    public Integer call() throws Exception {
        System.out.println("Callable子线程开始计算啦!");
        Thread.sleep(2000);        for(int i=0 ;i<5000;i++){
            sum=sum+i;
        }
        System.out.println("Callable子线程计算结束!");        return sum;
    }
}

Future模式

        //创建线程池
        ExecutorService es = Executors.newSingleThreadExecutor();        //创建Callable对象任务
        CallableDemo calTask = new CallableDemo();        //提交任务并获取执行结果
        Future<Integer> future = es.submit(calTask);        //关闭线程池
        es.shutdown();        try {
            System.out.println("主线程在执行其他任务");            if (future.get() != null) {                //输出获取到的结果
                System.out.println("future.get()-->" + future.get());
            } else {                //输出获取到的结果
                System.out.println("future.get()未获取到结果");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主线程在执行完成");

FutureTask模式

        //创建线程池
        ExecutorService es = Executors.newSingleThreadExecutor();        //创建Callable对象任务
        CallableDemo calTask = new CallableDemo();        //创建FutureTask
        FutureTask<Integer> future = new FutureTask<>(calTask);        // future.run();  // 由于FutureTask继承于Runable,所以也可以直接调用run方法执行
        //执行任务
        es.submit(future); // 效果同上面直接调用run方法

        //关闭线程池
        es.shutdown();        try {
            System.out.println("主线程在执行其他任务");            if (future.get() != null) {                //输出获取到的结果
                System.out.println("future.get()-->" + future.get());
            } else {                //输出获取到的结果
                System.out.println("future.get()未获取到结果");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主线程在执行完成");

归并异步执行结果

public class FutureDemo{ 
     public static void main(String[] args)  {
         Long start = System.currentTimeMillis();         //开启多线程
         ExecutorService exs = Executors.newFixedThreadPool(10);         try {             //结果集
             List<Integer> list = new ArrayList<Integer>();
             List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();             //1.高速提交10个任务,每个任务返回一个Future入list
             for(int i=0;i<10;i++){
                 futureList.add(exs.submit(new CallableTask(i+1)));
             }

             Long getResultStart = System.currentTimeMillis();
             System.out.println("结果归集开始时间="+new Date());             //2.结果归集,遍历futureList,高速轮询(模拟实现了并发)获取future状态成功完成后获取结果,退出当前循环
             for (Future<Integer> future : futureList) {                  //CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
                 while (true) {                    //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。 
                    if (future.isDone()&& !future.isCancelled()) {
                         Integer i = future.get();//获取结果
                         System.out.println("任务i="+i+"获取完成!"+new Date());
                         list.add(i);                         break;//当前future获取结果完毕,跳出while
                     } else {                          //每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
                         Thread.sleep(1);
                     }
                 }
             }

             System.out.println("list="+list);

             System.out.println("总耗时="+(System.currentTimeMillis()-start)+",取结果归集耗时="+(System.currentTimeMillis()-getResultStart));
            
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             exs.shutdown();
         }
     }
     static class CallableTask implements Callable<Integer>{
         Integer i;         
         public CallableTask(Integer i) {             super();             this.i=i;
         } 
         @Override
         public Integer call() throws Exception {             if(i==1){
                 Thread.sleep(3000);//任务1耗时3秒
             }else if(i==5){
                 Thread.sleep(5000);//任务5耗时5秒
             }else{
                 Thread.sleep(1000);//其它任务耗时1秒
             }
             System.out.println("task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!");  
             return i;
         }
     }
 }

© 著作权归作者所有

本文分享自微信公众号 - 服务端技术杂谈(ITIBB2014),作者:春哥大魔王

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-05-07

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • BeeGo web搭建–环境配置

    学习一个新框架,对于有较多编程经验的开发人员写代码不是问题,但是往往卡在了环境配置或者第一段Hello World,今天介绍一下Beego web框架的搭建和环...

    春哥大魔王
  • 在成为CTO之前,程序员怎样赚外快?

    作为一个码code的程序员,虽然可能没有朋友,比较宅,但是整体花销往往不比正常人少。VPS,域名,MAC还有一堆的收费软件,数码设备等,都是卖肾的节奏。 当然作...

    春哥大魔王
  • 自己动手撸个markdown渲染器

    春哥大魔王
  • 房上的猫:吃货联盟项目

    一.首先先定义部分成员变量: String[] name = new String[4];// 订餐人 String[] greens = new St...

    房上的猫
  • 算法题3

    py3study
  • 高并发 threadlocal+countDownLatch+线程池走起来

    gfu
  • Java 8 日期时间 API

    java 8 通过发布新的Date-Time API (JSR 310)来进一步加强对日期和时间的处理。

    一滴水的眼泪
  • 孔子装爹案例_帮助理解多态的成员访问特点及转型

    class 孔子爹 {   public int age = 40;   public void teach() {     System....

    黑泽君
  • JAVA|Java方法的使用

    方法从简来说就是,把一个功能单独放在大括号内,当需要这个功能的时候我们直接调用方法,这样不仅实现了代码的复用,还解决了代码冗余的问题。

    算法与编程之美
  • Java底层-JMX

    引言部分摘自百度百科,实际上JMX是java5开始提供的对java应用进行监控的一套接口,或者我们也可以像理解JUC包一样理解JMX,把它当成一个框架。JMX这...

    每天学Java

扫码关注云+社区

领取腾讯云代金券