前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >异步任务编排神器CompletableFuture

异步任务编排神器CompletableFuture

原创
作者头像
菜菜的后端私房菜
发布2024-08-15 09:18:57
2370
发布2024-08-15 09:18:57
举报
文章被收录于专栏:深入浅出Java并发编程

异步任务编排神器CompletableFuture

当需要获取异步任务的结果时,通常可以通过Future接口的get方法来获取结果

但是当异步任务繁多并且复杂,任务间可能存在依赖关系时,Future接口变得不太好用

比如任务A完成后串行执行任务B,等到B、C任务都完成后执行D任务,等到D、E、F任务都完成后汇总结果返回

当遇到复杂的异步任务编排时,Future不太好用,但是在JDK8中并发包推出的CompletableFuture能够很方便的处理这种异步编排任务

image.png
image.png

比如在一个页面需要查询多个服务的数据,如果同步查询会导致性能太慢

异步查询多个服务的数据再汇总返回,则能提高更多的性能

API

这里的API只作简单说明,大概分下类,各个分类下具体API的功能可自行查看文档(或者用到时再自行查看文档)

CompletableFuture提供的API大概分为几个大类:

同步与异步、串行、AND、OR、

同步与异步

**API携带Async则说明是异步,并且可以设置线程池**

一般业务开发,CompletableFuture用于处理IO任务,最好使用异步,并且指定线程池

代码语言:java
复制
CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {

            System.out.println("task a run");

            return "a";

});

串行

串行执行指的是任务需要同步执行,如图中的A、B任务,需要A任务执行完才能执行B任务

**串行API通常以then开头**,如:thenRunAsync、thenAccpetAsync、thenApplyAsync

代码语言:java
复制
CompletableFuture<String> taskB = taskA.thenApply((s) -> {

    System.out.println("task b run");

    return s + "b";

})

AND

AND指的是需要两个任务都完成,才能继续执行后续的任务,比如图中的B、C任务,要都完成才能执行D任务

**AND相关API通常以Combine、Both有关**,如:thenCombineAsync、thenAcceptBothAsync、runAfterBothAsync

代码语言:java
复制
CompletableFuture<String> taskD = taskB.thenCombineAsync(taskC, (b, c) -> {

    System.out.println("task d run");

    return b + c;

})

如果依赖多个任务同时完成,可以使用allOf(如图中的D、F、E任务)

代码语言:java
复制
CompletableFuture.allOf(taskF,taskE,taskD);

OR

OR指的是两个任务中其中一个完成,就可以继续执行后续任务

OR相关API通常以Either有关:applyToEitherAsync、acceptEitherAsync、runAfterEitherAsync

如果依赖多个任务的OR时使用:CompletableFuture.anyOf

异常处理

任务执行过程中可能出现异常,可以通过exceptionally 、whenComplete、handler等API对异常进行处理

代码语言:java
复制
CompletableFuture<String> taskF = CompletableFuture.supplyAsync(() -> {

    System.out.println("task f run");

    return "a";

}).exceptionally(e -> {

    System.out.println("出现异常");

    throw new RuntimeException("error");

});

注意事项

使用CompletableFuture时需要注意,如果不了解原理容易踩坑:

比如:任务出了异常怎么办?任务如何选择线程池的?线程又是如何执行的?

带着这一系列问题,我们往下看

出了异常怎么办?

使用CompletableFuture进行异步编排任务时,任务可能出现异常,因此**必须使用API进行处理**

**CompletableFuture遇到异常时,可能会使用CompletionException或ExecutionException包装异常**

代码语言:java
复制
public static void exception() {

    CompletableFuture<Void> taskException = CompletableFuture.supplyAsync(() -> {

        System.out.println("begin");

        return null;

    });



    taskException

            .thenApply(result -> {

                int i = 1 / 0;

                return i;

            })

            .exceptionally(err -> {

                //java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

                System.out.println(err);



                //java.lang.ArithmeticException: / by zero

                System.out.println(err.getCause());



                //java.lang.ArithmeticException: / by zero

                //使用工具处理异常

                System.out.println(getException(err));

                return 0;

            });

    

}

因为异常会被包装,因此处理异常时,**最好使用工具类获取异常**

代码语言:java
复制
public static Throwable getException(Throwable throwable) {

    //异常为CompletionException或ExecutionException,并且Cause不为空时解析

    if ((throwable instanceof CompletionException或 || throwable instanceof ExecutionException)

            && Objects.nonNull(throwable.getCause())) {

        return throwable.getCause();

    }

    return throwable;

}
如何选择线程池?

CompletableFuture中选择线程池有三种情况:

  1. **使用方法时指定线程池**
  2. **未指定线程池时,使用ForkJoin的公共线程池 ForkJoinPool.commonPool() (适合CPU任务,最大线程数量 = CPU - 1)**
  3. **未指定线程池时,使用 ThreadPerTaskExecutor 每次执行任务时创建一个线程执行 (适合周期长的任务,创建/销毁线程开销大)**

当未指定线程池时,可能使用ForkJoin的线程池也可能使用ThreadPerTaskExecutor,在没有查看源码的情况下会容易踩坑

并且 ThreadPerTaskExecutorForkJoinPool.commonPool() 都不适合IO任务

接下来一步步查看源码,分析CompletableFuture什么情况下会选择哪种线程池

CompletableFuture.supplyAsync

代码语言:java
复制
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {

    return asyncSupplyStage(asyncPool, supplier);

}

当我们使用未指定线程池的方法时,会直接使用asyncPool作为线程池

代码语言:java
复制
private static final Executor asyncPool = useCommonPool ?

    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

**asyncPool根据useCommonPool来判断是使用 ForkJoinPool.commonPool() 还是使用 ThreadPerTaskExecutor**

那么useCommonPool是如何确定的呢?我们继续往下查看

代码语言:java
复制
private static final boolean useCommonPool =

    (ForkJoinPool.getCommonPoolParallelism() > 1);

能否使用useCommonPool,由 ForkJoinPool.getCommonPoolParallelism() 决定,当它大于1时则使用 ForkJoinPool.commonPool() 否则使用 ThreadPerTaskExecutor

ForkJoinPool.getCommonPoolParallelism() 返回字段 commonParallelism

代码语言:java
复制
static final int commonParallelism;

commonParallelism 用于表示ForkJoinPool的并行粒度,在ForkJoinPool静态代码块中赋值初始化

ForkJoinPool.static

代码语言:java
复制
static {

    //其他略...

    

    //创建公共池

    common = java.security.AccessController.doPrivileged

        (new java.security.PrivilegedAction<ForkJoinPool>() {

            public ForkJoinPool run() { return makeCommonPool(); }});

    

    //计算并行粒度

    int par = common.config & SMASK; // report 1 even if threads disabled

    commonParallelism = par > 0 ? par : 1;

}

commonParallelism 并发粒度的字段由par决定,而par = common.config & SMASK

其中SMASK为65535(十进制),其二进制为全1,因此由 common 的字段 config 决定

(在创建公共池的过程会设置config字段)

ForkJoinPool.makeCommonPool

在创建公共池的代码中主要观察变量 parallelism 它为并发粒度

如果不携带参数,**默认情况下并发粒度为CPU核数-1**

代码语言:java
复制
private static ForkJoinPool makeCommonPool() {



    final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =

            new CommonPoolForkJoinWorkerThreadFactory();

    //初始化并发粒度为-1

    int parallelism = -1;

    ForkJoinWorkerThreadFactory factory = null;

    UncaughtExceptionHandler handler = null;

    try {  // ignore exceptions in accessing/parsing properties

        String pp = System.getProperty

            ("java.util.concurrent.ForkJoinPool.common.parallelism");

        String fp = System.getProperty

            ("java.util.concurrent.ForkJoinPool.common.threadFactory");

        String hp = System.getProperty

            ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");

        if (pp != null)

            //如果携带启动参数则设置为对应的并发粒度

            parallelism = Integer.parseInt(pp);

        if (fp != null)

            factory = ((ForkJoinWorkerThreadFactory)ClassLoader.

                       getSystemClassLoader().loadClass(fp).newInstance());

        if (hp != null)

            handler = ((UncaughtExceptionHandler)ClassLoader.

                       getSystemClassLoader().loadClass(hp).newInstance());

    } catch (Exception ignore) {

    }

    if (factory == null) {

        if (System.getSecurityManager() == null)

            factory = commonPoolForkJoinWorkerThreadFactory;

        else // use security-managed default

            factory = new InnocuousForkJoinWorkerThreadFactory();

    }

    if (parallelism < 0 && // default 1 less than #cores

        //默认情况下并发粒度 = CPU核数 - 1

        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)

        parallelism = 1;

    if (parallelism > MAX\_CAP)

        parallelism = MAX\_CAP;

    return new ForkJoinPool(parallelism, factory, handler, LIFO\_QUEUE,

                            "ForkJoinPool.commonPool-worker-");

}

在构建对象时,config字段 this.config = (parallelism & SMASK) | mode

其中SMASK为全1,mode为0,得到的结果是不变的,因此config的值就是parallelism并发粒度

至此我们可以得出结论:**默认情况下,如果不指定线程池,当CPU核数-1超过1则会使用ForkJoin公共池(最大线程数量 = CPU核数 - 1),否则使用ThreadPerTaskExecutor(每次执行都创建线程执行)**

代码语言:java
复制
static final class ThreadPerTaskExecutor implements Executor {

    public void execute(Runnable r) { new Thread(r).start(); }

}

ThreadPerTaskExecutor只适合执行周期长的任务,如果任务周期短,并且多的情况下,创建线程也会是很大一笔开销

**使用CompletableFuture时务必指定线程池,线程池最好根据业务做好隔离**

**如果不指定线程池会根据CPU核数选择ForkJoinCommonPool或ThreadPerTaskExecutor,它们并不适合IO任务**

线程如何执行?

在同步与异步的API中线程如何执行?

**在异步的API中,如果指定线程池则交给线程池中的工作线程执行,否则选择Common Pool或ThreadPerTaskExecutor**

**在同步的API中,通常是当前线程进行执行任务,但如果任务B依赖的任务A未完成则由任务A的回调线程执行,任务A如果是异步则由线程池来执行**

代码语言:java
复制
public static void testSync() {

        CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {

//            try {

//                Thread.sleep(5000);

//            } catch (InterruptedException e) {

//                throw new RuntimeException(e);

//            }

            return "ok";

        }, threadPool);





        CompletableFuture<Void> taskB = taskA.thenAccept(s -> {

            //任务A执行完(不睡时)由当前线程执行

            //任务A未执行完(睡眠时)由线程池的工作线程执行

            System.out.println(s);

            System.out.println(s);

        });



        taskB.join();

}

总结

**CompletableFuture提供串行、AND、OR、异常捕获、结果聚合等多种API,通过这些API能够更方便、快捷的实现异步任务的编排**

**使用CompletableFuture时务必对任务进行异常处理,并且它会使用CompletionException或ExecutionException包装异常,再打印异常时记得使用工具类处理,避免打印到包装的异常**

**CompletableFuture异步任务中如果指定线程池则直接使用指定的线程池**

**如果未指定线程池,当前服务器CPU数量小于等于2(并发粒度低)时使用ThreadPerTaskExecutor,其他情况(并发粒度高)使用ForkJoin框架的common pool(并发粒度 = CPU数量 - 1)**

**未指定线程池时使用的线程池适合CPU任务,并不适合IO任务,使用异步时务必指定线程池**

**当使用异步API时,由线程池的工作线程执行;使用同步API时,如果当前任务依赖的任务未完成,则有依赖、未完成的任务的线程来执行**

🌠最后(一键三连求求拉~)

本篇文章被收入专栏 由点到线,由线到面,深入浅出构建Java并发编程知识体系,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 异步任务编排神器CompletableFuture
    • API
      • 注意事项
        • 出了异常怎么办?
        • 如何选择线程池?
        • 线程如何执行?
      • 总结
        • 🌠最后(一键三连求求拉~)
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档