首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

CompletableFuture 使用手册

1、CompletableFuture 背景介绍

1.1、什么是CompletableFuture

     CompletableFuture是在JDK1.8提供了一种更加强大的异步编程的api。异步通常意味着非阻塞,可以使我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态是否完成、是否有异常信息等。

     CompletableFuture同时实现了Future接口和CompletionStage接口,也就是说Future的功能特性CompletableFuture也有;而同时拥有CompletionStage接口实现任务编排相关的功能。

1.2、为什么使用CompletableFuture

     讲解CompletableFuture 之前,我们可以先想想没有CompletableFuture出现之前我们是怎么做的?

1.2.1、Future接口介绍

     根据Oracle官方出具的Java文档说明,创建线程的方式只有两种:继承Thread或者实现Runnable接口。但是这两种方法都存在一个缺陷,没有返回值,也就是说我们无法得知线程执行结果。虽然简单场景下已经满足,但是当我们需要返回值的时候怎么办呢?

     于是在 Java 1.5 以后的Callable和Future接口就解决了这个问题,我们可以通过向线程池提交一个Callable来获取一个包含返回值的Future对象,从此,我们的程序逻辑就不再是同步顺序。

因此我们想要开启异步线程,执行任务,获取结果,就变成这种实现:

public void future() { try { FutureTask futureTask = new FutureTask(() -> "future() 开始"); new Thread(futureTask).start(); System.out.println(futureTask.get()); } catch (Exception e) { LOGGER.error("future() error:", e); } }

或者使用线程池的方式

public void future2() { try { ExecutorService executorService = Executors.newFixedThreadPool(2); Future future = executorService.submit(() -> "future()2 开始"); System.out.println(future.get()); executorService.shutdown(); }catch (Exception e){ LOGGER.error("future2() error:", e); } }

使用线程池的方式其本质上也就将提交的Callable的实现先封装成FutureTask,然后通过submit方法来提交任务,来执行异步逻辑。

1.2.2、Future接口的局限性

     虽然我们可以通过Future接口的get方法可以获取任务异步执行的结果,但是get方法会阻塞主线程,也就是异步任务没有完成,主线程会一直阻塞,直到任务结束。

     Future也提供了isDone方法来查看异步线程任务执行是否完成,如果完成,就可以获取任务的执行结果,代码如下:

public void future3() { try { ExecutorService executorService = Executors.newFixedThreadPool(3); Future future = executorService.submit(() -> "future()3 开始"); //进行任务是否完成判断 while (!future.isDone()) { //任务没有完成,没有就继续循环判断 } System.out.println(future.get()); executorService.shutdown(); } catch (Exception e) { LOGGER.error("future3() error:", e); } }

但是这种轮询查看异步线程任务执行状态,也是非常消耗cpu资源。

同时对于一些复杂的异步操作任务的处理,可能需要各种同步组件来一起完成。

1.2.3、结论

     因此可以通过上面得到结论:虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。

    而CompletableFuture的出现相比于Future一方面是提供了类似观察者模式的回调监听的功能,也就是当上一阶段任务执行结束之后,可以回调你指定的下一阶段任务,而不需要阻塞获取结果之后来处理结果,另一方面也极大扩展了原来future的使用场景,丰富了强大的API,能支持我们更多的业务场景。

1.3、CompletableFuture业务应用场景

对于一些耗时操作,尤其是依赖一个或者多个服务的操作,可以使用异步任务来改善程序的性能,加快程序的响应速度。

如果异步任务之间是相互独立的,或者它们之间的某些结果是另一个的输入,可以将这些异步任务构造或合并成一个。

2、CompletableFuture 使用

2.1、CompletableFuture API 介绍

2.1.1、实例化CompletableFuture

//比较特殊,入参就是返回值,也就是说他可以用来执行需要其他返回值的异步任务。1、public static CompletableFuture completedFuture(U value)

//无返回值,采用内部的 ForkJoinPool.commonPool() 获取线程池public static CompletableFuture runAsync(Runnable runnable)

//无返回值,使用自定义线程池public static CompletableFuture runAsync(Runnable runnable, Executor executor)

//有返回值,采用内部的 ForkJoinPool.commonPool() 获取线程池public static CompletableFuture supplyAsync(Supplier supplier)

//有返回值,使用自定义线程池public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

2.1.2、获取任务执行结果

//一直阻塞直到获取到结果public T get();//可以指定超时时间,当到了指定的时间还未获取到任务,就会抛出TimeoutException异常。public T get(long timeout, TimeUnit unit);//就是获取任务的执行结果,但不会产生阻塞。如果任务还没执行完成,那么就会返回你传入的 valueIfAbsent 参数值,//如果执行完成了,就会返回任务执行的结果。public T getNow(T valueIfAbsent);//跟get()的主要区别就是,get()会抛出检查时异常,join()不会public T join();

2.1.3、主动触发任务返回结果

//一直阻塞直到获取到结果public T get();//可以指定超时时间,当到了指定的时间还未获取到任务,就会抛出TimeoutException异常。public T get(long timeout, TimeUnit unit);//就是获取任务的执行结果,但不会产生阻塞。如果任务还没执行完成,那么就会返回你传入的 valueIfAbsent 参数值,//如果执行完成了,就会返回任务执行的结果。public T getNow(T valueIfAbsent);//跟get()的主要区别就是,get()会抛出检查时异常,join()不会public T join();

2.2、CompletableFuture 使用

2.2.1、创建CompletableFuture 异步执行任务

//无返回值,采用内部的 ForkJoinPool.commonPool() 获取线程池public static CompletableFuture runAsync(Runnable runnable)

//有返回值,采用内部的 ForkJoinPool.commonPool() 获取线程池public static CompletableFuture supplyAsync(Supplier supplier)

这一类接口用来创建 CompletableFuture,进行异步执行即可。在这里我使用的是内部默认的ForkJoinPool.commonPool() 来获取线程池。

2.2.1.1、runAsync(无返回值)

2.2.1.2、supplyAsync(有返回值)

2.2.2、接手不抛出异常后的任务进行回调

//可以拿到上一步任务执行的结果进行处理,并且返回处理的结果public CompletionStage thenApply(Function//拿不到上一步任务执行的结果,但会执行Runnable接口的实现public CompletableFuture thenRun(Runnable action);//可以拿到上一步任务执行的结果进行处理,但不需要返回处理的结果public CompletionStage thenAccept(Consumer

这类回调的特点就是,当任务正常执行完成,没有异常的时候就会进行回调。

总的来说:

thenApply(有返回值,有入参) ;

thenAccept(无返回值,有入参) ;经常使用在调用链的最末端的最后一个回调函数中使用。

thenRun(无返回值,无入参);经常使用在调用链的最末端的最后一个回调函数中使用。

一般使用thenApply居多。

2.2.2.1、thenApply2.2.2.2、thenRun

2.2.2.3、thenAccept2.2.3、处理任务异常后回调

以下面thenApply为例,出现了异常是不会再出现回调结果的

因此在实际业务场景我们有时需要处理异常后的回调

//当任务执行过程中出现异常的时候,会回调exceptionally方法指定的回调,但是如果没有出现异常,是不会回调的。public CompletionStage exceptionally(Function fn);

没有异常时:

  没有异常时就是正常的回调.不会执行exceptionally里面的回调方法

出现异常时:不会执行上一步的,只会执行exceptionally中的方法

2.2.4、同时接收任务执行正常和异常的回调

当业务场景中,认为任务某个节点时在使用时可能会抛出异常,需要做额外的处理,那么就可以使用下面的方法。

//跟exceptionally有点像,但是exceptionally是出现异常才会回调,两者都有返回值,//都能吞了异常,但是handle正常情况下也能回调和thenApply一样。public CompletionStage handle(BiFunction//能接受正常或者异常的回调,并且不影响上个阶段的返回值,也就是主线程能获取到上个阶段的返回值;当出现异常时,//whenComplete并不能吞了这个异常,也就是说主线程在获取执行异常任务的结果时,会抛出异常。public CompletionStage whenComplete(BiConsumer

2.2.4.1、handle方法

1、无异常时和thenApply一样。

2、有异常时,就不会执行上一个的方法

2.2.4.2、whenComplete

无异常时

有异常时

2.2.5、对2个任务结果进行合并

2.2.5.1、thenCombine 有入参 有返回值

//当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调 BiFunction ,然后返回新的结果public CompletionStage thenCombine (CompletionStage BiFunction

2.2.5.2、thenAcceptBoth 有入参 无返回值

//当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调 BiFunction ,无返回值public CompletionStage thenAcceptBoth(CompletionStage BiConsumer

2.2.5.3、runAfterBoth 无入参 无返回值

//两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)public CompletionStage runAfterBoth(CompletionStage other,Runnable action);

总结:thenCombine / thenAcceptBoth / runAfterBoth

    这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

2.2.6、取2个任务结果中最先返回的2.2.6.1、applyToEither 有入参 有返回值

//两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。public CompletionStage applyToEither(CompletionStage

2.2.6.2、acceptEither 有入参 无返回值

//两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。public CompletionStage acceptEither(CompletionStage

2.2.6.3、runAfterEither 无入参 无返回值

//两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)public CompletionStage runAfterEither(CompletionStage other,Runnable action);

总结:applyToEither / acceptEither / runAfterEither

    这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务,其区别在于applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:

2.2.7、下个任务依赖于上个任务的回调结果

      在这里使用thenCompose, thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。该方法会返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该result的新的CompletableFuture实例;如果该CompletableFuture实例为null,则,然后执行这个新任务,

//thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。public CompletableFuture thenCompose(Function

2.2.8、等待所有任务都完成后,再执行剩下的主流程

     对于下面的案例来说,可以认为是有6个task任务,并行执行,当时最后需要将结果进行统一输出。这种一般来说,适合这种需要并行调用业务接口信息,然后需要再将所有的结果进行处理,返回给下游的业务。

2.2.8.1、带返回值

//等待所有future返回public static CompletableFuture allOf(CompletableFuture... cfs);

    allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

public void allofRetureValue() { List futures = Stream.of(1, 2, 3, 4, 5, 6) .map(i -> CompletableFuture.supplyAsync(() -> { try { // 子任务的执行代码,返回一个字符串结果 // ...

} catch (Exception e) { // 处理子任务执行过程中的异常 e.printStackTrace(); return null; // 返回null表示子任务执行失败 }

return "Result " + i; })) .collect(Collectors.toList());

// 等待所有子任务完成并获取结果 List results = futures.stream() .map(CompletableFuture::join) .filter(result -> result != null) .collect(Collectors.toList());

// 所有子任务完成后,继续执行主流程 // ... }

2.2.8.2、allOf 不带返回值

public void allofNoRetureValue() { List futures = Stream.of(1, 2, 3, 4, 5, 6) .map(i -> CompletableFuture.runAsync(() -> { try { // 子任务的执行代码,返回一个字符串结果 // ...

} catch (Exception e) { // 处理子任务执行过程中的异常 e.printStackTrace(); // ... } })).collect(Collectors.toList());

// 等待所有子任务完成 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

// 所有子任务完成后,继续执行主流程 // ... }

需要注意的是:

   在上面的CompletableFuture示例代码中,如果子任务执行过程中抛出异常,主线程并不能立即感知,因此需要在子任务中进行异常处理,否则主线程可能会一直等待。

因此为了防止这种情况,可以设置超时处理,如下所示:

public void allOfTimeOut() { List futures = Stream.of(1, 2, 3, 4, 5, 6) .map(i -> CompletableFuture.supplyAsync(() -> { try { // 子任务的执行代码,返回一个字符串结果 // ...

} catch (Exception e) { // 处理子任务执行过程中的异常 e.printStackTrace(); return null; // 返回null表示子任务执行失败 }

return "Result " + i; })) .collect(Collectors.toList());

// 等待所有子任务完成并获取结果 List results = futures.stream() .map(future -> { try { return future.get(Duration.ofSeconds(5).toMillis(), java.util.concurrent.TimeUnit.MILLISECONDS); } catch (Exception e) { // 处理子任务执行过程中的异常或超时 LOGGER.error("allOfTimeOut() error:", e); e.printStackTrace(); future.cancel(true); // 取消未完成的子任务 return null; // 返回null表示子任务执行失败 } }).filter(result -> result != null).collect(Collectors.toList());

// 所有子任务完成后,继续执行主流程 // ... }

2.2.9、多个任务中取最快的返回

     在这里我们使用anyOf来实现。它的含义是只有有任意一个CompletableFuture结束,就可以做接下来的事情,而无须像allof那样,等待所有的CompletableFuture结束。

//多个future执行,取当中最快的一个返回public static CompletableFuture anyOf(CompletableFuture... cfs)

public void anyOf() { try { List futures = Stream.of(1, 2, 3, 4, 5) .map(i -> CompletableFuture.supplyAsync(() -> { try { // 子任务的执行代码,返回一个字符串结果 // ... Thread.sleep(1000); } catch (Exception e) { // 处理子任务执行过程中的异常 LOGGER.error("testStream error->{}", e); e.printStackTrace(); return null; // 返回null表示子任务执行失败 } return "Result " + i; })).collect(Collectors.toList()); CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])).join(); } catch (Exception e) { LOGGER.error("runAfterEither() error:", e); } }

2.2.10、所有的以Async结尾的方法说明

     上面说的一些方法,写的案例基本上都是没有带Async结尾的,主要区别就是xxxAsync会重新开一个线程来执行下一阶段的任务,而不带Async还是用上一阶段任务执行的线程执行。

     两个xxxAsync主要区别就是一个使用默认的线程池来执行任务,也就是ForkJoinPool,一个是使用方法参数传入的线程池来执行任务。

2.2.11、使用建议

      建议使用直接建立新的线程池.

      CompletableFuture 默认使用ForkJoinPool.commonPool(), commonPool是一个会被很多任务共享的线程池,比如同一JVM上的所有CompletableFuture、并行Stream都将共享commonPool,commonPool设计时的目标场景是运行非阻塞的CPU密集型任务,为最大利用CPU,其线程数默认为CPU数量-1。因此在业务开发的时候所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。因此建议新建一个

使用方式,如下所示。

ExecutorService threadPool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(100), new ThreadFactory(){ @Override public Thread newThread(Runnable r) { return new Thread("test"); } },new ThreadPoolExecutor.AbortPolicy()); public void completableFutureSupplyAsync() { CompletableFuture future = CompletableFuture.supplyAsync(() -> { System.out.println("有返回值 completableFutureSupplyAsync() 开始异步执行"); return "异步运行完成"; },threadPool); String join = future.join();// 等待异步任务完成并获取结果 System.out.println(join); }

历史文章及资料

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OKqocVvff1M3g6q4JEUs6WoQ0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券