大家好,我是程序员牛肉。
CompletableFuture 作为 Java 异步编程的核心工具,其底层设计巧妙融合了任务编排、线程调度和状态管理。其也成为了企业在开发异步编程的时候绕不开的工具类。最近在刷牛客的时候,也发现它还挺常考的:
因此我们今天就来从源码角度讲解一下它的运行机制。不知道什么是CompletetableFuture可以先看看我之前写的这篇文章:
还在用Future搞异步?快看看企业中常用的CompletableFuture是怎么用的!
[CompletableFuture 是 Java 8 引入的一个类,用于支持异步编程和构建复杂的异步任务流水线。它实现了 Future 接口,并提供了更强大的功能,如任务组合、异常处理和回调机制。通过 CompletableFuture,可以轻松地实现非阻塞的异步操作,并链式处理多个任务的依赖关系,从而提升程序的并发性能和响应能力。]
让我们开始看CompletableFuture的源码,在CompletableFuture中有两个最重要的字段:
volatile Object result;//计算结果或者Allsult
volatile Completion stack;//依赖链表
CompletableFuture
的核心状态载体,负责存储异步任务的最终结果或异常信息。null:表示任务尚未完成。非 null:若为普通对象,表示任务正常完成的结果值;若为 AltResult 实例,则表示任务因异常终止(内部封装了 Throwable)CompletableFuture
的所有依赖任务(即后续阶段),是链式调用和任务触发的核心枢纽。栈中每个 Completion
节点代表一个依赖任务(如 thenApply
、thenAccept
等注册的回调),保存其执行函数、目标线程池及下游 CompletableFuture
的引用。当当前 CompletableFuture
完成时,通过遍历 stack
栈中的 Completion
节点,依次触发后续任务的执行(同步或异步)[说人话就是:result就是这些CompletableFuture进行异步编排之后这些子任务的运行结果,而stack则是存储这个编排链路的数据结构。]
当我们使用CompletableFuture进行如下任务编排的时候:
CompletableFuture.supplyAsync(() -> 10)
.thenApply(r -> r * 1)
.thenApply(r -> r * 2)
.thenApply(r -> r * 3);
该任务初始返回值为 10
,然后依次对这个返回值进行三次乘法运算(分别乘以 1
、2
和 3
),整个过程是异步执行的,并且通过链式调用的方式将各个操作串联起来。
基于我们刚才说的两个核心字段,我们可以用流程图绘制这段代码的执行流程:
每一次thenApply方法都会创建出一个completion,这个completion除了我们的lambda函数之外,还会有两个指针指向它的前置future和下一个future,所以当一个子任务完成之后,他将通过指针寻找自己的下一个completion对象,直至到末尾。
[整个过程通过 stack
链表构建了 CompletableFuture
之间的依赖关系,当一个 CompletableFuture
完成时,会根据 stack
链表依次触发后续依赖操作。每个 CompletableFuture
的计算结果存储在 result
字段中,通过这种方式实现了异步任务的链式调用和结果传递。]
需要注意的是:stack是由栈节点组成的链表,那为什么各个节点要是栈呢?这是因为CompletableFuture同时支持多个同级别的后续任务,那么此时的结构图就变为了:
让我们从源码角度看一看这个调用链,当我们使用thenApply的时候:
继续追:
这段代码的逻辑为:先检查传递过来的Function是否为空。之后判断当前阶段是否已经完成,如果result已经有值就说明当前任务已经执行完了,可以直接执行下一个任务(uniApplyNow(r,e,f))。
如果result还等于null,就说明当前任务还没有被执行。那我们就需要把当前这个function通过unipush添加到调用链中,所以让我们继续看unipush方法:
final void unipush(Completion c) {
if (c != null) { // 确保回调任务非空
while (true) { // 自旋循环,保证并发下操作成功
// 尝试将回调压入栈(依赖链)
if (!this.tryPushStack(c)) { // 压栈失败(如并发竞争)
if (this.result == null) { // 当前 Future 未完成
continue; // 重试压栈
}
// 当前 Future 已完成,设置回调的 next 为 null
NEXT.set(c, (Void) null); // 终止链表
}
// 如果当前 Future 已完成(result 非空)
if (this.result != null) {
c.tryFire(0); // 立即触发回调执行
}
break; // 退出循环
}
}
}
通过代码我们可以看到:压栈操作对应的方法是tryPushStack(),继续往里追:
final boolean tryPushStack(Completion c) {
Completion h = this.stack;
NEXT.set(c, h);
return STACK.compareAndSet(this, h, c);
}
首先获取当前栈顶的 Completion
对象并存储到变量 h
中,然后将新传入的 Completion
对象 c
的 next
指针指向 h
,从而将 c
链接到栈中。最后,通过原子操作 compareAndSet
尝试将当前栈顶元素从 h
更新为 c
,如果在这期间栈顶元素没有被其他线程修改,则更新成功并返回 true
,否则返回 false
。
我们可以理解为:同一个栈节点内的多个Completion对象的顺序关系是依赖于Completion中的next指针来实现的,我们来看看Completion的代码:
那这就是thenApply的源码设计,而我们都知道CompletableFuture中的方法基本都会分为同步和异步。thenApply对应的异步方法是thenApplyAsync,那这两个方法的区别是什么?
通过源码我们可以看到,这二者执行的底层方法都是一样的,只不过thenApplyAsync要多提交了一个线程池来实现异步。
[文章末尾我们也会讲解Completablefuture中的默认线程池机制]
看完了thenApply之后,我们再来看一下allof方法的实现机制:
[CompletableFuture.allOf
是 Java 中用于组合多个 CompletableFuture
对象的方法,它允许你等待多个异步任务完成后再执行后续操作。]
allof内部走的addTree方法,我们先来从全局上理一下addTree的设计思路:
[将多个 CompletableFuture
合并为一个新的 CompletableFuture<Void>
,当所有输入的 Future 完成(无论是正常完成还是异常完成)时,这个合并后的 Future 才会完成。]
我们来逐行解析一下allof方法:
当Completablefuture数组不为空的时候,就开始基于分治思想构造平衡二叉树了:
通过中点分割(>>> 1
等效于除以 2),确保合并树的深度为 O(log n)
,避免链式结构导致的 O(n)
深度,减少回调层级,提升性能。
之后需要检查同步结果:
a
和 b
已经完成(result
不为 null
),直接合并结果,无需触发异步回调。这里面蕴含的是“快速失败”的思想,如果在多个completablefuture任务中,很早就出现了异常,那么剩下的任务就不用执行了。直接抛出当前这个异常就好。
所以我们可以看到在这个方法中,只要任何一个任务抛异常之后,我们就会把最终的completablefuture任务d的result设置为NIL,然后直接返回d。而NIL实际上就是一个异常:
[这意味着,如果任何一个任务失败,整个 allOf
操作都会失败,开发者可以统一处理这些异常。]
而CompletableFuture.allOf
通过递归树形结构(由 andTree
方法实现)来组合多个异步任务。这种树形结构可以显著减少线程的同步开销。在传统的线性等待中,每个任务都需要单独等待,这会导致大量的线程同步操作。而树形结构可以将任务分层处理,减少同步的次数,从而提高并发任务的处理效率。
基于allof的这种设计,假设我们有8个Completablefuture任务,allof方法会将其编排为:
这样的话,我们不再需要逐个检查这八个节点是否都完成,而是尝试去检查其更高层的父节点。基于这种操作,我们就大大减少了同步次数。
最后我们来讲一个有意思的:前面我们说到了Completablefuture中的异步方法是基于传递线程池实现的,那么Completablefuture中使用的默认线程池是什么呢?
很多同学可能立马就能想到“ForkJoinPool”,你要是这么回答到也还可以,但是深度不够。
让我们从源码中获取答案:
我们可以看到:默认线程池是根据USE_COMMON_POOL这个布尔值来进行选择的。
USE_COMMON_POOL
为 true
,则 ASYNC_POOL
被设置为 ForkJoinPool.commonPool()
。USE_COMMON_POOL
为 false
,则 ASYNC_POOL
被设置为一个 ThreadPerTaskExecutor
。[ThreadPerTaskExecutor
是一个特殊的线程池实现,其核心设计思想是为每个提交的任务创建一个独立的线程来执行。这种设计特别适合轻量级任务的执行,理论上讲可以创建无数个线程,但实际还是受限于系统资源。]
那么USE_COMMON_POOL是如何进行取值的呢?
它使用的是getCommonPoolPrarllelism来进行的判断,这个方法返回公共线程池的并行度,默认情况下,公共线程池的并行度等于系统的可用处理器数量。
默认情况下,ForkJoinPool.commonPool
的并行度等于系统的可用处理器数量减去1。
ForkJoinPool.getCommonPoolParallelism()
返回 3,那么 USE_COMMON_POOL
将被设置为 true
,此时的默认线程池为ForkJoinPool。ForkJoinPool.getCommonPoolParallelism()
返回 1,那么 USE_COMMON_POOL
将被设置为 false
,此时的默认线程池为:ThreadPerTaskExecutor 。也就是说CompletableFuture的默认线程池只有在双核以上的机器内才会使用。在双核及以下的机器中,会为每个任务创建一个新线程,等于没有使用线程池,且有资源耗尽的风险。
而且吧,就算是ForkJoinPool也会有很多槽点,因此在使用Complefuturetable的时候,强烈推荐使用自定义线程池。
那么今天关于Completablefuture的文章就介绍到这里了。相信通过我的介绍,你已经大致了解Completablefuture的一些底层机制。希望我的文章可以帮到你。