前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >采用线程池进行异步任务处理

采用线程池进行异步任务处理

作者头像
春哥大魔王
发布2018-07-23 11:31:30
3.1K0
发布2018-07-23 11:31:30
举报

创建线程池

阿里JAVA编码规约,建议采用ThreadPoolExecutor创建线程池。

代码语言:javascript
复制
    private static ExecutorService simpleExecutorService = new ThreadPoolExecutor(            200,            300,            0L,
            TimeUnit.MICROSECONDS,            new LinkedBlockingDeque<Runnable>(10000),            new ThreadPoolExecutor.DiscardPolicy());

在同步操作过程中通过线程池完成异步操作

代码语言:javascript
复制
public void doSomething(final String message) {        simpleExecutorService.execute(new Runnable() {            @Override
            public void run() {                try {
                    Thread.sleep(3000);
                    System.out.println("step 2");
                    System.out.println("message=>" + message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });        System.out.println("step 1");
    }

进行测试

代码语言:javascript
复制
ThreadUtil threadUtil = new ThreadUtil();
threadUtil.doSomething("a thread pool demo");

输出结果

代码语言:javascript
复制
step 1
step 2message=>a thread pool demo

@Async

在Spring3.x之后框架已经支持采用@Async注解进行异步执行了。

被@Async修饰的方法叫做异步方法,这些异步方法会在新的线程中进行处理,不影响主线程的顺序执行。

无返回值执行
代码语言:javascript
复制
@Component@Slf4jpublic class AsyncTask {    @Async
    public void dealNoReturnTask(){        log.info("Thread {} deal No Return Task start", Thread.currentThread().getName());        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis());
    }
}

进行调用:

代码语言:javascript
复制
@SpringBootTest(classes = SpringbootApplication.class)@RunWith(SpringJUnit4ClassRunner.class)@Slf4jpublic class AsyncTest {    @Autowired
    private AsyncTask asyncTask;    @Test
    public void testDealNoReturnTask(){
        asyncTask.dealNoReturnTask();        try {
            log.info("begin to deal other Task!");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
有返回值执行
代码语言:javascript
复制
    @Async
    public Future<String> dealHaveReturnTask() {        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("thread", Thread.currentThread().getName());
        jsonObject.put("time", System.currentTimeMillis());        return new AsyncResult<String>(jsonObject.toJSONString());
    }

判断任务是否取消:

代码语言:javascript
复制
    @Test    public void testDealHaveReturnTask() throws Exception {

        Future<String> future = asyncTask.dealHaveReturnTask();        log.info("begin to deal other Task!");        while (true) {            if(future.isCancelled()){                log.info("deal async task is Cancelled");                break;
            }            if (future.isDone() ) {                log.info("deal async task is Done");                log.info("return result is " + future.get());                break;
            }            log.info("wait async task to end ...");
            Thread.sleep(1000);
        }
    }
异步执行结果异常处理

我们可以实现AsyncConfigurer接口,也可以继承AsyncConfigurerSupport类来实现 在方法getAsyncExecutor()中创建线程池的时候,必须使用 executor.initialize(), 不然在调用时会报线程池未初始化的异常。 如果使用threadPoolTaskExecutor()来定义bean,则不需要初始化

代码语言:javascript
复制
@Configuration@EnableAsync@Slf4jpublic class AsyncConfig implements AsyncConfigurer {//    @Bean//    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){//        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//        executor.setCorePoolSize(10);//        executor.setMaxPoolSize(100);//        executor.setQueueCapacity(100);//        return executor;//    }

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("AsyncExecutorThread-");
        executor.initialize(); //如果不初始化,导致找到不到执行器
        return executor;
    }    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }
}

异步异常处理类:

代码语言:javascript
复制
@Slf4jpublic class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params));        if (ex instanceof AsyncException) {
            AsyncException asyncException = (AsyncException) ex;
            log.info("asyncException:{}",asyncException.getErrorMessage());
        }

        log.info("Exception :");
        ex.printStackTrace();
    }
}@Data@AllArgsConstructorpublic class AsyncException extends Exception {    private int code;    private String errorMessage;
}
  • 在无返回值的异步调用中,异步处理抛出异常,AsyncExceptionHandler的handleUncaughtException()会捕获指定异常,原有任务还会继续运行,直到结束。
  • 在有返回值的异步调用中,异步处理抛出异常,会直接抛出异常,异步任务结束,原有处理结束执行。

Future或FutureTask

需要结合Callable

代码语言:javascript
复制
public class CallableDemo implements Callable<Integer> {    private int sum;    @Override
    public Integer call() throws Exception {
        System.out.println("Callable子线程开始计算啦!");
        Thread.sleep(2000);        for(int i=0 ;i<5000;i++){
            sum=sum+i;
        }
        System.out.println("Callable子线程计算结束!");        return sum;
    }
}
Future模式
代码语言:javascript
复制
        //创建线程池
        ExecutorService es = Executors.newSingleThreadExecutor();        //创建Callable对象任务
        CallableDemo calTask = new CallableDemo();        //提交任务并获取执行结果
        Future<Integer> future = es.submit(calTask);        //关闭线程池
        es.shutdown();        try {
            System.out.println("主线程在执行其他任务");            if (future.get() != null) {                //输出获取到的结果
                System.out.println("future.get()-->" + future.get());
            } else {                //输出获取到的结果
                System.out.println("future.get()未获取到结果");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主线程在执行完成");
FutureTask模式
代码语言:javascript
复制
        //创建线程池
        ExecutorService es = Executors.newSingleThreadExecutor();        //创建Callable对象任务
        CallableDemo calTask = new CallableDemo();        //创建FutureTask
        FutureTask<Integer> future = new FutureTask<>(calTask);        // future.run();  // 由于FutureTask继承于Runable,所以也可以直接调用run方法执行
        //执行任务
        es.submit(future); // 效果同上面直接调用run方法

        //关闭线程池
        es.shutdown();        try {
            System.out.println("主线程在执行其他任务");            if (future.get() != null) {                //输出获取到的结果
                System.out.println("future.get()-->" + future.get());
            } else {                //输出获取到的结果
                System.out.println("future.get()未获取到结果");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主线程在执行完成");

归并异步执行结果

代码语言:javascript
复制
public class FutureDemo{ 
     public static void main(String[] args)  {
         Long start = System.currentTimeMillis();         //开启多线程
         ExecutorService exs = Executors.newFixedThreadPool(10);         try {             //结果集
             List<Integer> list = new ArrayList<Integer>();
             List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();             //1.高速提交10个任务,每个任务返回一个Future入list
             for(int i=0;i<10;i++){
                 futureList.add(exs.submit(new CallableTask(i+1)));
             }

             Long getResultStart = System.currentTimeMillis();
             System.out.println("结果归集开始时间="+new Date());             //2.结果归集,遍历futureList,高速轮询(模拟实现了并发)获取future状态成功完成后获取结果,退出当前循环
             for (Future<Integer> future : futureList) {                  //CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
                 while (true) {                    //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。 
                    if (future.isDone()&& !future.isCancelled()) {
                         Integer i = future.get();//获取结果
                         System.out.println("任务i="+i+"获取完成!"+new Date());
                         list.add(i);                         break;//当前future获取结果完毕,跳出while
                     } else {                          //每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
                         Thread.sleep(1);
                     }
                 }
             }

             System.out.println("list="+list);

             System.out.println("总耗时="+(System.currentTimeMillis()-start)+",取结果归集耗时="+(System.currentTimeMillis()-getResultStart));
            
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             exs.shutdown();
         }
     }
代码语言:javascript
复制
     static class CallableTask implements Callable<Integer>{
         Integer i;         
         public CallableTask(Integer i) {             super();             this.i=i;
         } 
         @Override
         public Integer call() throws Exception {             if(i==1){
                 Thread.sleep(3000);//任务1耗时3秒
             }else if(i==5){
                 Thread.sleep(5000);//任务5耗时5秒
             }else{
                 Thread.sleep(1000);//其它任务耗时1秒
             }
             System.out.println("task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!");  
             return i;
         }
     }
 }

© 著作权归作者所有

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-05-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 春哥talk 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 创建线程池
  • 在同步操作过程中通过线程池完成异步操作
  • 进行测试
  • 输出结果
  • @Async
    • 无返回值执行
      • 有返回值执行
        • 异步执行结果异常处理
        • Future或FutureTask
          • Future模式
            • FutureTask模式
            • 归并异步执行结果
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档