前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >下次换你来拷打面试官!一文带你读懂企业常用异步编程核心工具类CompletableFuture

下次换你来拷打面试官!一文带你读懂企业常用异步编程核心工具类CompletableFuture

作者头像
程序员牛肉
发布2025-02-04 13:30:13
发布2025-02-04 13:30:13
7600
代码可运行
举报
文章被收录于专栏:小牛肉带你学Java
运行总次数:0
代码可运行

大家好,我是程序员牛肉。

CompletableFuture 作为 Java 异步编程的核心工具,其底层设计巧妙融合了任务编排线程调度状态管理。其也成为了企业在开发异步编程的时候绕不开的工具类。最近在刷牛客的时候,也发现它还挺常考的:

因此我们今天就来从源码角度讲解一下它的运行机制。不知道什么是CompletetableFuture可以先看看我之前写的这篇文章:

还在用Future搞异步?快看看企业中常用的CompletableFuture是怎么用的!

2024-11-19

[CompletableFuture 是 Java 8 引入的一个类,用于支持异步编程和构建复杂的异步任务流水线。它实现了 Future 接口,并提供了更强大的功能,如任务组合、异常处理和回调机制。通过 CompletableFuture,可以轻松地实现非阻塞的异步操作,并链式处理多个任务的依赖关系,从而提升程序的并发性能和响应能力。]

让我们开始看CompletableFuture的源码,在CompletableFuture中有两个最重要的字段:

代码语言:javascript
代码运行次数:0
复制
    volatile Object result;//计算结果或者Allsult
    volatile Completion stack;//依赖链表
  • result:该字段是 CompletableFuture 的核心状态载体,负责存储异步任务的最终结果或异常信息。null:表示任务尚未完成。非 null:若为普通对象,表示任务正常完成的结果值;若为 AltResult 实例,则表示任务因异常终止(内部封装了 Throwable)
  • stack:一个用链表实现的栈,管理当前 CompletableFuture 的所有依赖任务(即后续阶段),是链式调用和任务触发的核心枢纽。栈中每个 Completion 节点代表一个依赖任务(如 thenApplythenAccept 等注册的回调),保存其执行函数、目标线程池及下游 CompletableFuture 的引用。当当前 CompletableFuture 完成时,通过遍历 stack 栈中的 Completion 节点,依次触发后续任务的执行(同步或异步)

[说人话就是:result就是这些CompletableFuture进行异步编排之后这些子任务的运行结果,而stack则是存储这个编排链路的数据结构。]

当我们使用CompletableFuture进行如下任务编排的时候:

代码语言:javascript
代码运行次数:0
复制
CompletableFuture.supplyAsync(() -> 10)
                 .thenApply(r -> r * 1)
                 .thenApply(r -> r * 2)
                 .thenApply(r -> r * 3);

该任务初始返回值为 10,然后依次对这个返回值进行三次乘法运算(分别乘以 123),整个过程是异步执行的,并且通过链式调用的方式将各个操作串联起来。

基于我们刚才说的两个核心字段,我们可以用流程图绘制这段代码的执行流程:

每一次thenApply方法都会创建出一个completion,这个completion除了我们的lambda函数之外,还会有两个指针指向它的前置future和下一个future,所以当一个子任务完成之后,他将通过指针寻找自己的下一个completion对象,直至到末尾。

[整个过程通过 stack 链表构建了 CompletableFuture 之间的依赖关系,当一个 CompletableFuture 完成时,会根据 stack 链表依次触发后续依赖操作。每个 CompletableFuture 的计算结果存储在 result 字段中,通过这种方式实现了异步任务的链式调用和结果传递。]

需要注意的是:stack是由栈节点组成的链表,那为什么各个节点要是栈呢?这是因为CompletableFuture同时支持多个同级别的后续任务,那么此时的结构图就变为了:

让我们从源码角度看一看这个调用链,当我们使用thenApply的时候:

继续追:

这段代码的逻辑为:先检查传递过来的Function是否为空。之后判断当前阶段是否已经完成,如果result已经有值就说明当前任务已经执行完了,可以直接执行下一个任务(uniApplyNow(r,e,f))。

如果result还等于null,就说明当前任务还没有被执行。那我们就需要把当前这个function通过unipush添加到调用链中,所以让我们继续看unipush方法:

代码语言:javascript
代码运行次数:0
复制
 final void unipush(Completion c) {
    if (c != null) {  // 确保回调任务非空
        while (true) {  // 自旋循环,保证并发下操作成功
            // 尝试将回调压入栈(依赖链)
            if (!this.tryPushStack(c)) {  // 压栈失败(如并发竞争)
                if (this.result == null) {  // 当前 Future 未完成
                    continue;  // 重试压栈
                }
                // 当前 Future 已完成,设置回调的 next 为 null
                NEXT.set(c, (Void) null);  // 终止链表
            }
            // 如果当前 Future 已完成(result 非空)
            if (this.result != null) {
                c.tryFire(0);  // 立即触发回调执行
            }
            break;  // 退出循环
        }
    }
}

通过代码我们可以看到:压栈操作对应的方法是tryPushStack(),继续往里追:

代码语言:javascript
代码运行次数:0
复制
    final boolean tryPushStack(Completion c) {
        Completion h = this.stack;
        NEXT.set(c, h);
        return STACK.compareAndSet(this, h, c);
    }

首先获取当前栈顶的 Completion 对象并存储到变量 h 中,然后将新传入的 Completion 对象 cnext 指针指向 h,从而将 c 链接到栈中。最后,通过原子操作 compareAndSet 尝试将当前栈顶元素从 h 更新为 c,如果在这期间栈顶元素没有被其他线程修改,则更新成功并返回 true,否则返回 false

我们可以理解为:同一个栈节点内的多个Completion对象的顺序关系是依赖于Completion中的next指针来实现的,我们来看看Completion的代码:

那这就是thenApply的源码设计,而我们都知道CompletableFuture中的方法基本都会分为同步和异步。thenApply对应的异步方法是thenApplyAsync,那这两个方法的区别是什么?

通过源码我们可以看到,这二者执行的底层方法都是一样的,只不过thenApplyAsync要多提交了一个线程池来实现异步。

[文章末尾我们也会讲解Completablefuture中的默认线程池机制]

看完了thenApply之后,我们再来看一下allof方法的实现机制:

[CompletableFuture.allOf 是 Java 中用于组合多个 CompletableFuture 对象的方法,它允许你等待多个异步任务完成后再执行后续操作。]

allof内部走的addTree方法,我们先来从全局上理一下addTree的设计思路:

[将多个 CompletableFuture 合并为一个新的 CompletableFuture<Void>,当所有输入的 Future 完成(无论是正常完成还是异常完成)时,这个合并后的 Future 才会完成。]

我们来逐行解析一下allof方法:

  • cfs:待合并的completablefuture数组。
  • lo和hi:这个数组的左右区间,也可以理解为是待递归处理的数组区间。’

当Completablefuture数组不为空的时候,就开始基于分治思想构造平衡二叉树了:

通过中点分割(>>> 1 等效于除以 2),确保合并树的深度为 O(log n),避免链式结构导致的 O(n) 深度,减少回调层级,提升性能。

之后需要检查同步结果:

  • 同步检查优化:如果两个子 Future ab 已经完成(result 不为 null),直接合并结果,无需触发异步回调。
  • 异常传播:若任一子 Future 异常,合并后的 Future 会记录第一个异常,确保快速失败。

这里面蕴含的是“快速失败”的思想,如果在多个completablefuture任务中,很早就出现了异常,那么剩下的任务就不用执行了。直接抛出当前这个异常就好。

所以我们可以看到在这个方法中,只要任何一个任务抛异常之后,我们就会把最终的completablefuture任务d的result设置为NIL,然后直接返回d。而NIL实际上就是一个异常:

[这意味着,如果任何一个任务失败,整个 allOf 操作都会失败,开发者可以统一处理这些异常。]

CompletableFuture.allOf 通过递归树形结构(由 andTree 方法实现)来组合多个异步任务。这种树形结构可以显著减少线程的同步开销。在传统的线性等待中,每个任务都需要单独等待,这会导致大量的线程同步操作。而树形结构可以将任务分层处理,减少同步的次数,从而提高并发任务的处理效率。

基于allof的这种设计,假设我们有8个Completablefuture任务,allof方法会将其编排为:

这样的话,我们不再需要逐个检查这八个节点是否都完成,而是尝试去检查其更高层的父节点。基于这种操作,我们就大大减少了同步次数。

最后我们来讲一个有意思的:前面我们说到了Completablefuture中的异步方法是基于传递线程池实现的,那么Completablefuture中使用的默认线程池是什么呢?

很多同学可能立马就能想到“ForkJoinPool”,你要是这么回答到也还可以,但是深度不够。

让我们从源码中获取答案:

我们可以看到:默认线程池是根据USE_COMMON_POOL这个布尔值来进行选择的。

  • 如果 USE_COMMON_POOLtrue,则 ASYNC_POOL 被设置为 ForkJoinPool.commonPool()
  • 如果 USE_COMMON_POOLfalse,则 ASYNC_POOL 被设置为一个 ThreadPerTaskExecutor

[ThreadPerTaskExecutor 是一个特殊的线程池实现,其核心设计思想是为每个提交的任务创建一个独立的线程来执行。这种设计特别适合轻量级任务的执行,理论上讲可以创建无数个线程,但实际还是受限于系统资源。]

那么USE_COMMON_POOL是如何进行取值的呢?

它使用的是getCommonPoolPrarllelism来进行的判断,这个方法返回公共线程池的并行度,默认情况下,公共线程池的并行度等于系统的可用处理器数量。

默认情况下,ForkJoinPool.commonPool 的并行度等于系统的可用处理器数量减去1。

  • 假设你的系统有 4 个核心,ForkJoinPool.getCommonPoolParallelism() 返回 3,那么 USE_COMMON_POOL 将被设置为 true,此时的默认线程池为ForkJoinPool。
  • 假设你的系统有两个核心,ForkJoinPool.getCommonPoolParallelism() 返回 1,那么 USE_COMMON_POOL 将被设置为 false,此时的默认线程池为:ThreadPerTaskExecutor 。

也就是说CompletableFuture的默认线程池只有在双核以上的机器内才会使用。在双核及以下的机器中,会为每个任务创建一个新线程,等于没有使用线程池,且有资源耗尽的风险。

而且吧,就算是ForkJoinPool也会有很多槽点,因此在使用Complefuturetable的时候,强烈推荐使用自定义线程池。

那么今天关于Completablefuture的文章就介绍到这里了。相信通过我的介绍,你已经大致了解Completablefuture的一些底层机制。希望我的文章可以帮到你。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-01-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员牛肉 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档