首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >并发编程

并发编程

原创
作者头像
用户11708420
修改2025-07-08 15:22:25
修改2025-07-08 15:22:25
13000
代码可运行
举报
运行总次数:0
代码可运行

并发

  • 同时完成多任务。无需等待当前任务完成即可执行其他任务。“并发”解决了程序因外部控制而无法进一步执行的阻塞问题。最常见的例子就是 I/O 操作,任务必须等待数据输入(在一些例子中也称阻塞)。这个问题常见于 I/O 密集型任务。

并行

  • 同时在多个位置完成多任务。这解决了所谓的 CPU 密集型问题:将程序分为多部分,在多个处理器上同时处理不同部分来加快程序执行效率。

区别

  • 并发是 “一个人切换着干多个事”,核心是 “不等待”,解决 “等不及” 的阻塞问题;比如你等面泡好边玩手机
  • 并行是 “多个人同时干不同的事”,核心是 “真同时”,解决 “干得慢” 的效率问题。比如大家一起包饺子 你擀皮他包馅,两人同时动手(多个 “处理器”),比一个人又擀又包快得多。
  • 两者都能 “同时推进多个任务”,但一个靠切换避免等待,一个靠分工提速,适用的场景不一样。

四种任务处理方式(纯并发 / 并发 - 并行 / 并行 - 并发 / 纯并行)

可以把 “处理器” 想象成 “干活的人”,任务就是要做的活儿:

  • 纯并发:只有 1 个人干活,但他会在多个任务间来回切换(比如一边煮水一边切菜,水开了就去关火,回来继续切)。哪怕再加人(多处理器),他还是一个人切换着干,效率不会变高。
  • 并发 - 并行:原本 1 个人切换着干,要是再来几个人(多处理器),就能分给他们一起干,人越多越快(比如原本 1 人切菜 + 炒菜,来人后 1 人切、1 人炒,更快完成)。
  • 并行 - 并发:本来是设计给多个人一起干的活儿(比如两人合作包饺子,一人擀皮一人包),但如果只有 1 个人,也能自己先擀几个皮、再包几个,慢慢完成(不会干不了)。Java 8 的 Streams 就是这种,多处理器能加速,单处理器也能跑。
  • 纯并行:必须多个人一起干,少一个人都不行。

“抽象泄露”:看似简单的工具,藏着一堆麻烦

简单说,就是 “本以为能用一个简单的工具搞定复杂问题,结果工具底下的细节全冒出来了,躲都躲不掉”。

比如,你想用一个 “并发编程库” 写程序,以为它会帮你处理好所有底层细节(比如 CPU 怎么分工、数据怎么同步),结果写着写着就发现:

  • 这个库在 A 电脑上跑得好好的,到 B 电脑就出错(因为两台电脑的 CPU 缓存、配置不一样);
  • 测试时任务少没问题,用户多了就卡壳(因为没考虑到实际负载)。

这些底层细节(CPU 怎么工作、数据怎么传递)本来该被工具 “藏起来”,结果全漏出来捣乱,这就是 “抽象泄露”。并发 / 并行编程几乎都这样,这不是Java特有的 - 这是并发和并行编程的本质。

纯函数式语言也救不了?

有人觉得 “纯函数式语言”(比如 Haskell)能避免这些问题,因为它的代码逻辑更严谨,不容易出数据混乱。

确实,它能解决很多并发问题,但也不是万能的。比如你用它写了一个 “消息队列”(比如处理用户订单的排队系统):

  • 如果没算好队列能装多少订单,用户突然变多,队列就会塞满卡死;
  • 或者限制订单数量时没考虑不同场景(比如促销时该放宽限制,平时该收紧),照样出问题。

最后你会发现:不管用什么语言,都得把底层细节(队列大小、处理速度、硬件限制)摸透,不然系统迟早出问题。并发 / 并行编程的本质就是这样 —— 看似是 “同时干多件事”,实则处处是坑,必须精打细算。

并发的意义

并发性是一系列性能技术,专注于减少等待

  • 并发属于性能技术,核心是让程序更快,Java 中使用需谨慎,仅在有重大性能问题时采用最简单方式。
  • “减少等待” 是关键,只有程序存在等待时并发才能发挥作用。比如如果你发起I/O请求并立即获得结果,没有延迟,因此无需改进。如果你在多个处理器上运行多个任务,并且每个处理器都以满容量运行,并且没有任务需要等待其他任务,那么尝试提高吞吐量是没有意义的。
  • 等待可以以多种形式出现 - 这解释了为什么存在如此不同的并发方法。

并发的超能力

并发的适用场景与影响因素

  • 单处理器情况:任务切换成本由该处理器承担,若所有任务持续运行,并发可能让系统变慢。但在任务有等待(如 “分身” 敲门等待)时,并发仍有益,可切换到就绪任务。且有时为了代码简洁,即使单处理器也可能使用并发。
  • 多处理器情况:对于计算约束问题(如破解密码),线程数与处理器数量一致时效率较高,参与的线程越多,可能越快得到结果。
  • 其他例子:客户服务部门中,员工(处理器)数量有限,电话(任务)需排队;“鞋匠和精灵” 故事中,制鞋某些环节的限制会影响整体速度,说明问题本身会驱动解决方案设计。

并发的问题与挑战

  • 共享内存问题:类似工厂工人放蛋糕到同一个盒子里可能出现的冲突,并发任务因共享资源会产生竞争条件,结果取决于哪个任务先操作,通常用锁机制解决。
  • 非确定性:并发程序虽理论上可通过维护和检查保证正常工作,但实践中可能在特定条件下失败,且测试难以复现故障,失败可能以客户投诉形式出现。

实现并发的方法

  • 进程方式:在操作系统级别实现,进程在自身地址空间运行,相互隔离,编程相对容易。但存在数量和开销限制,适用性有限。
  • 线程方式:Java 采用的方式,线程在单个进程中创建,共享内存和 I/O 等资源,难点在于协调不同线程任务对资源的访问。
  • 函数式语言:部分编程语言让并发任务彼此隔离,如 Erlang,还包含任务间安全通信机制,适合程序中大量使用并发的部分。

java并发的四句格言

1.不要这样做

  • 避免自己动手处理并发问题。因为并发会产生诸多深层且微妙的陷阱,即使在处理简单事情时看似安全,实则风险很大,若能避免,能让编程工作更轻松。
  • 若不得不进行并发处理,应采取最简单、最安全的方法解决问题。要使用众所周知的库,尽可能少地自己编写相关代码,在并发问题上,“简单” 并非缺点,自负才是需要避免的。

2.没有什么是真的,一切可能都有问题

  • 并发编程有不确定性:在并发领域,没有绝对确定的事情,需要以怀疑的态度看待一切。即使是简单的变量赋值,也可能不按预期工作,存在诸多看似有效实则无效的情况。
  • 并发编程需关注细节:非并发程序中可忽略的细节,在并发程序中变得至关重要。例如,需了解处理器缓存及本地缓存与主内存的一致性问题,深入掌握对象构造的复杂性以避免构造器意外暴露数据给其他线程修改等。

上述涉及的主题非常复杂,本文无法解释,自行了解这些内容(并建议参考《Java Concurrency in Practice》)。

3.它起作用,并不意味着它没有问题

  • 并发程序的隐蔽性问题:编写看似正常却存在问题的并发程序很容易,且问题往往在极端情况下才会暴露,可能在程序部署后引发用户问题。
  • 正确性验证的困境:无法证明并发程序是正确的,最多只能证明其不正确,甚至多数时候连证明不正确都做不到,因为问题可能难以检测。同时,通常也无法编写有效的测试来验证,需依靠代码检查结合深入的并发知识来发现错误。
  • 设计参数的约束:即使有效的并发程序,也只能在其设计参数范围内正常工作,超出范围就可能出现故障。
  • 认知偏差与误区:从确定性编程转向并发编程时,容易出现 “邓宁 - 克鲁格效应”,即不熟练的人会错误高估自己的能力。实际情况是,无论对自己的线程安全代码多有把握,都可能存在问题,且编译器不会提示错误,必须先掌握所有并发问题才能写出正确的代码。在并发领域,“无明显错误和编译错误” 不代表程序正常,过度自信是大忌。

隐蔽性问题的典型例子是竞态条件(Race Condition) 导致的偶发错误,这类问题在常规测试中可能难以显现,却会在高并发的极端场景下暴露,以下是一个具体案例:

假设有一个简单的计数器程序,多个线程同时对一个共享变量count进行自增操作(count++)。从表面上看,这段代码逻辑清晰,在单线程环境下完全正常,但在多线程并发时,却可能因为操作的非原子性出现问题。

count++的底层操作可拆解为三步:

  1. 读取当前count的值;
  2. 对该值进行加 1 运算;
  3. 将结果写回count

当两个线程同时执行这一操作时,可能出现以下极端情况:

  • 线程 A 读取count的值为 10;
  • 线程 B 同时读取count的值也为 10;
  • 线程 A 完成加 1,将 11 写回count
  • 线程 B 也完成加 1,同样将 11 写回count

原本预期两次自增后count应为 12,但实际结果却是 11,这就是典型的竞态条件导致的错误。这种问题在线程数量少、执行频率低时(如测试环境)很难触发,一旦部署到高并发场景(如大量用户同时操作),就会频繁出现数据不一致的问题,且由于错误具有偶发性,排查难度极大。

这个例子直观体现了并发程序中 “看似正常却存在隐蔽问题” 的特点 —— 代码在表面上没有语法错误,常规测试也可能表现正常,但在极端并发条件下会暴露深层缺陷,进而引发用户可见的问题(如数据错误、逻辑异常等)。

这种问题 “难触发” 的核心原因在于:错误是否出现取决于线程执行的 “时间窗口” 是否恰好重合,而在低并发场景下,这个窗口出现的概率极低。

以计数器例子具体说明:

  • 当线程数量少(比如只有 2 个线程)、执行频率低(比如每秒仅执行几次自增)时,两个线程恰好同时读取到同一个count值(即 “时间窗口重合”)的概率非常小。多数情况下,线程 A 会完整执行 “读 - 加 - 写” 三步后,线程 B 才开始操作,结果会是正确的 12。
  • 只有当线程 A 还没写完新值时,线程 B 恰好开始读取旧值,才会出现结果错误(11)。这种情况在低并发下属于 “小概率事件”,可能测试几十次、几百次都遇不到,从而让人误以为代码没问题。

而在高并发场景下(比如 100 个线程同时高频操作),线程间的执行顺序会变得极度混乱,“时间窗口重合” 的概率会急剧升高,错误就会频繁暴露。

因此,这类问题的隐蔽性就在于:它不是 “必然出错”,而是 “可能出错”,在低并发测试中容易被漏检,直到高并发环境才会集中爆发。

4.你仍然必须理解它

  • 并发很重要,要学好

并行流(Parallel Streams)

并行流的好处

  • Java 8 的流有个方便的功能,加个parallel()就能让流并行处理,利用多处理器加快速度。比如找质数的例子里,用了parallel()后,速度差不多是不用时的 3 倍,确实很划算。

parallel () 不是万能的

  • 比如求和时,用range()生成序列再并行求和,速度很快;但如果用iterate()生成序列,再加parallel(),不仅可能更慢,数据量大了还会耗光内存。
  • 数据结构影响很大:数组分割起来方便,适合并行;链表拆分麻烦,并行效果差。原始类型数组(比如 long 数组)存的是连续数据,处理器缓存能高效利用,速度快;包装类数组(比如 Long 数组)存的是对象引用,缓存用起来费劲,速度慢很多。

parallel () 和 limit () 一起用容易出问题:

  • 比如用Stream.generate()生成序列,加parallel()limit(10),本来想要 10 个数,结果内部可能生成 1024 个,还可能出现随机顺序的结果。这是因为并行时多个线程会预取很多值,最后只取前 10 个,新手容易踩坑。

总结:并行流看起来简单,加个parallel()就行,但其实有很多门道。不能随便乱加,得先弄明白并行到底能不能帮上忙,最好先测试再用。记住 “先让程序跑起来,再考虑提速,而且只在必须的时候才优化”。

创建与运行

为什么不用 Thread 而用 ExecutorService?

写并发代码,几乎不会直接用new Thread(),而是用ExecutorService(线程池),原因很简单:

  • 手动创建线程像 “每次用一次性筷子”,用完就扔,浪费资源;
  • 线程池像 “可重复使用的筷子”,线程用完后回收,避免频繁创建销毁的开销,这在高并发场景(比如服务器处理大量请求)中是必须的。

最常用的两种线程池

1. 单线程池(SingleThreadExecutor)
  • 特点:只有 1 个线程,任务排队执行,不会同时运行。
  • 场景:需要 “顺序执行 + 线程安全” 的任务(比如处理数据库事务日志,必须按顺序写,不能乱)。
2. 缓存线程池(CachedThreadPool)
  • 特点:自动创建新线程,任务多就多开线程,任务少就回收,适合短期任务。
  • 场景:处理大量独立的短任务(电商平台处理用户同时发起的多个 “商品详情页查询)。

任务的两种类型:Runnable 和 Callable

里根据是否需要返回结果,选择不同的任务类型:

类型

特点

场景

Runnable

无返回值,用run()方法

只需要执行动作(比如打印日志)

Callable

有返回值,用call()方法

需要计算结果(比如统计订单金额)

用单线程池执行 Runnable 任务(顺序执行)
代码语言:javascript
代码运行次数:0
运行
复制
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

// 1. 定义任务(实现Runnable,无返回值)
class PrintTask implements Runnable {
    private int id;
    public PrintTask(int id) {
        this.id = id;
    }
    @Override
    public void run() {
        // 模拟任务执行(比如处理数据)
        try {
            Thread.sleep(100); // 休眠100ms,模拟耗时
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务" + id + "完成,线程:" + Thread.currentThread().getName());
    }
}

public class SingleThreadDemo {
    public static void main(String[] args) {
        // 2. 创建单线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();
        
        // 3. 提交10个任务
        IntStream.range(0, 10).forEach(i -> executor.execute(new PrintTask(i)));
        
        // 4. 关闭线程池(重要:用完必须关,否则程序不会结束)
        executor.shutdown();
    }
}
用缓存线程池执行 Callable 任务(并行计算)
代码语言:javascript
代码运行次数:0
运行
复制
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

// 1. 定义有返回值的任务(实现Callable)
class SumTask implements Callable<Integer> {
    private int start;
    private int end;
    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    public Integer call() {
        int sum = 0;
        for (int i = start; i <= end; i++) {
            sum += i;
        }
        System.out.println("计算" + start + "-" + end + ",线程:" + Thread.currentThread().getName());
        return sum;
    }
}

public class CachedThreadPoolDemo {
    public static void main(String[] args) throws Exception {
        // 2. 创建缓存线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        
        // 3. 提交3个计算任务(分别计算1-100,101-200,201-300的和)
        List<SumTask> tasks = IntStream.range(0, 3)
                .mapToObj(i -> new SumTask(i * 100 + 1, (i + 1) * 100))
                .collect(Collectors.toList());
        
        // 4. 执行所有任务并获取结果(invokeAll会等待所有任务完成)
        List<Future<Integer>> results = executor.invokeAll(tasks);
        
        // 5. 汇总结果
        int total = 0;
        for (Future<Integer> result : results) {
            total += result.get(); // get()会获取返回值,若未完成则等待
        }
        System.out.println("总和:" + total); // 结果应为45150
        
        // 6. 关闭线程池
        executor.shutdown();
    }
}

注意事项

  • 线程池必须关闭:用shutdown()(等待任务完成后关闭),否则程序会一直运行。
  • 避免共享变量:多个线程操作同一个变量(比如 Demo1 中的val)会出问题,尽量让任务独立(像 Demo2 的 Callable,每个任务自己计算,不共享数据)。
  • 别依赖 Future 的 get () 阻塞:网页提到 Future 的get()会 “等结果”,可能影响效率,企业里现在更推荐用 Java8 的CompletableFuture(后续会讲)。

如何优雅地终止耗时的并发任务

为啥需要终止任务?

  • 并发程序里常有长时间运行的任务(比如后台进程),有时需要在它们正常结束前停下来(比如程序要关闭了)。

别用 “硬中断”,容易出乱子

  • Java 早期有 “中断任务” 的机制,但就像突然拔电源关电脑 —— 可能导致数据丢失、状态混乱,现在不推荐用。

推荐方法:给任务设个 “停止信号”

  • 让任务自己定期检查一个 “是否要停” 的标志,看到标志说 “停”,就自己收拾好再退出。就像老师下课前敲铃,学生听到后先整理好东西再离开,而不是被强行拽出教室。

用 AtomicBoolean 保证信号靠谱

  • 这个 “停止信号” 得用线程安全的AtomicBoolean(原子布尔值),不能用普通的boolean。因为多个线程同时改一个普通布尔值可能出问题(比如两个线程同时点 “停止”,结果信号没传对),而AtomicBoolean能防这种混乱。

CompletableFuture类

什么是 CompletableFuture?

简单来说,CompletableFuture 实现了 FutureCompletionStage 接口。CompletableFuture 本质上代表了一个将来会完成的异步操作。 你可以把它想象成一个“占位符”或者“承诺”,它承诺在未来某个不确定的时间点会给你一个结果。这个结果可能是成功的值,也可能是操作中出现的异常。

它既可以作为生产者(提供结果),也可以作为消费者(处理结果)。

  • 生产者角色: 当你启动一个异步任务时,比如进行网络请求或者执行耗时计算,这个任务不会立即返回结果,而是返回一个 CompletableFuture 对象。这个对象就代表了未来这个任务的执行结果。当任务完成时,它会将结果“放入”这个 CompletableFuture 中。
  • 消费者角色: 你可以注册一系列的回调函数(消费者)到这个 CompletableFuture 上。当生产者完成并将结果放入时,这些回调函数就会被自动触发执行,从而处理这个结果。

异步的核心在于非阻塞。虽然消费者最终确实需要生产者的结果才能继续处理,但它们不必同步地等待。以下几点体现了 CompletableFuture 的异步性:

  1. 任务提交后立即返回: 当你使用 CompletableFuture 启动一个异步任务时,例如 supplyAsync()runAsync(),程序会立即得到一个 CompletableFuture 对象,而不会停下来等待任务执行完成。你的主线程可以继续执行其他操作,而不会被阻塞。
  2. 回调机制: 消费者不是通过不断地轮询(“你好了吗?你好了吗?”)来获取结果,而是通过注册回调函数。当生产者完成任务后,它会“通知”这些回调函数,然后这些回调函数才会被执行。这就像你点了一份外卖,你不需要一直站在厨房门口等,而是外卖做好了会通知你来取一样。
  3. 事件驱动: 这种机制可以看作是事件驱动的。生产者的完成是一个“事件”,这个事件触发了消费者的执行。这与传统的同步编程中,一步一步顺序执行的流程截然不同。
  4. 链式操作与并行: CompletableFuture 提供了丰富的API,允许你将多个异步操作串联起来(链式操作),或者将它们并行执行,从而构建复杂的异步流程。例如,你可以说“当任务A完成后,就去执行任务B;如果任务B也完成了,就去执行任务C”。所有这些步骤都可以在不阻塞主线程的情况下进行。

创建CompletableFuture

  • supplyAsync(): 如果你的异步操作会返回一个结果,可以使用 supplyAsync()。它接受一个 Supplier 函数式接口。
代码语言:java
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class BasicUsage {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        // 模拟一个耗时操作,返回一个字符串
        Supplier<String> supplier = () -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 4)); // 模拟1-3秒延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "异步操作的结果";
        };

        CompletableFuture<String> future = CompletableFuture.supplyAsync(supplier);

        // 注册回调函数,当异步操作完成时执行
        future.thenAccept(result -> {
            System.out.println("异步操作完成,结果是:" + result);
        });

        System.out.println("主线程继续执行,不等待异步操作...");

        // 为了演示,让主线程等待一下异步操作完成
        future.join(); // 阻塞当前线程直到CompletableFuture完成并获取结果
        // 也可以使用 future.get(); 但它会抛出检查性异常
        // future.get(5, TimeUnit.SECONDS); // 可以设置超时
        System.out.println("主线程结束。");
    }
}

在这个例子中,supplyAsync() 会在一个新的线程中执行 supplier 提供的逻辑。thenAccept() 用于注册一个回调,当异步操作完成并获得结果时,这个回调就会被执行。join() 方法会阻塞当前线程直到 CompletableFuture 完成并返回结果。

  • runAsync(): 如果你的异步操作不返回任何结果(类似 void 方法),可以使用 runAsync()。它接受一个 Runnable 函数式接口。
代码语言:java
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class RunAsyncExample {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2); // 模拟2秒延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            System.out.println("异步操作完成,没有返回结果。");
        });
        System.out.println("主线程任务不需要等待,执行主线的任务。");
        future.join(); // 等待异步操作完成
        System.out.println("主线程结束。");
    }
}

获取结果

  • join(): 阻塞当前线程,直到 CompletableFuture 完成并返回结果。如果 CompletableFuture 异常完成,它会抛出一个 CompletionException
  • get(): 阻塞当前线程,直到 CompletableFuture 完成并返回结果。与 join() 不同,get() 会抛出 InterruptedExceptionExecutionException(一个检查性异常,包装了实际的异常)。
  • 回调方法 (thenAccept, thenApply, thenRun 等): 这是非阻塞获取结果的方式。你可以在 CompletableFuture 完成时注册一个操作。
    • thenAccept(Consumer<? super T> action): 消费结果,不返回新的 CompletableFuture。
    • thenApply(Function<? super T, ? extends U> fn): 将结果转换成另一种类型,并返回一个新的 CompletableFuture。
    • thenRun(Runnable action): 不关心结果,只在 CompletableFuture 完成时执行一个操作。

结合 CompletableFuture

CompletableFuture 的强大之处在于它允许你将多个异步操作组合在一起,形成一个复杂的异步工作流。

链式调用

thenApply, thenAccept, thenCompose 等方法允许你链式调用异步操作。

代码语言:java
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;

public class CombineCompletableFutures {

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        CompletableFuture<String> fetchUserData = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一步:正在获取用户数据...");
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 3));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "用户ID:123, 名称:张三";
        });

        CompletableFuture<Double> calculateDiscount = fetchUserData.thenApplyAsync(userData -> {
            System.out.println("第二步:根据用户数据计算折扣...");
            // 假设从用户数据中提取ID并计算折扣
            String userId = userData.split(",")[0].split(":")[1];
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 3));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return Double.parseDouble(userId) * 0.01; // 模拟折扣
        });

        CompletableFuture<String> applyDiscountAndGenerateInvoice = calculateDiscount.thenApplyAsync(discount -> {
            System.out.println("第三步:应用折扣并生成发票...");
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 3));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "发票已生成,折扣为:" + String.format("%.2f", discount * 100) + "%";
        });

        applyDiscountAndGenerateInvoice.thenAccept(invoice -> {
            System.out.println("最终结果:" + invoice);
        }).join(); // 等待所有链式操作完成

        System.out.println("主线程结束。");
    }
}

组合多个 CompletableFuture

有时你需要等待多个独立的 CompletableFuture 都完成后再进行下一步操作。

allOf(): 等待所有给定的 CompletableFuture 完成。它返回一个 CompletableFuture<Void>

代码语言:java
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class AllOfExample {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "任务1完成";
        });

        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "任务2完成";
        });

        CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "任务3完成";
        });

        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);

        allTasks.thenRun(() -> {
            try {
                // 当所有任务都完成后,才能安全地获取它们的结果
                System.out.println(task1.join());
                System.out.println(task2.join());
                System.out.println(task3.join());
                System.out.println("所有任务都已完成!");
            } catch (Exception e) {
                System.err.println("有任务异常终止: " + e.getMessage());
            }
        }).join();

        System.out.println("主线程结束。");
    }
}

anyOf(): 任何一个给定的 CompletableFuture 完成时就完成。它返回一个 CompletableFuture<Object>,其结果是第一个完成的 CompletableFuture 的结果。

代码语言:java
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;

public class AnyOfExample {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        CompletableFuture<String> fastTask = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 3));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "这是最快的任务!";
        });

        CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(4, 6));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "这个任务有点慢。";
        });

        CompletableFuture<Object> anyTask = CompletableFuture.anyOf(fastTask, slowTask);

        anyTask.thenAccept(result -> {
            System.out.println("最先完成的任务结果是: " + result);
        }).join();

        System.out.println("主线程结束。");
    }
}

模拟耗时操作

在实际开发中,CompletableFuture 经常用于模拟网络请求、数据库操作、文件读写等耗时任务。通过 TimeUnit.SECONDS.sleep() 是一个简单有效的模拟方式。

代码语言:java
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;

public class SimulateLongRunningOperation {

    // 模拟从数据库获取用户信息的服务
    public static CompletableFuture<String> getUserInfo(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("正在从数据库获取用户 " + userId + " 的信息...");
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 4)); // 模拟DB查询延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "用户信息: {id: " + userId + ", name: John Doe}";
        });
    }

    // 模拟调用第三方API获取产品价格的服务
    public static CompletableFuture<Double> getProductPrice(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("正在从第三方API获取产品 " + productId + " 的价格...");
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(2, 5)); // 模拟API调用延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return 99.99; // 模拟产品价格
        });
    }

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        // 并行获取用户信息和产品价格
        CompletableFuture<String> userInfoFuture = getUserInfo("abc");
        CompletableFuture<Double> productPriceFuture = getProductPrice("xyz");

        // 当两者都完成后,组合结果
        CompletableFuture<String> combinedFuture = userInfoFuture.thenCombine(productPriceFuture, (userInfo, price) -> {
            System.out.println("所有异步数据获取完成,正在组合信息...");
            return "用户数据: [" + userInfo + "], 产品价格: [" + price + "]";
        });

        // 打印最终结果
        combinedFuture.thenAccept(finalResult -> {
            System.out.println("最终报告: " + finalResult);
        }).join();

        System.out.println("主线程结束。");
    }
}

thenCombine()的核心功能是将当前流(Flow A)与另一个流(Flow B)进行合并。当两个流都产生了元素时,它会使用一个指定的函数对这些元素进行组合,最终生成一个包含组合结果的新流。

CompletableFuture<String> userInfoFuture =getUserInfo("abc");执行后会直接放回一个CompletableFuture<String> 对象,并不会等到内部操作执行完后主线程才进行下一步操作。

异常处理

在异步编程中,异常处理至关重要。CompletableFuture 提供了几种优雅的异常处理机制。

  • exceptionally()

exceptionally()当 CompletableFuture 异常完成时,exceptionally() 会被调用,你可以提供一个备用结果。

代码语言:java
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;

public class ExceptionHandlingExceptionally {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        CompletableFuture<String> futureWithException = CompletableFuture.supplyAsync(() -> {
            System.out.println("模拟一个可能失败的异步操作...");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            if (ThreadLocalRandom.current().nextBoolean()) {
                System.out.println("操作成功。");
                return "成功结果";
            } else {
                System.err.println("操作失败,抛出异常!");
                throw new RuntimeException("模拟的业务异常:数据获取失败");
            }
        });

        futureWithException
            .exceptionally(ex -> {
                System.err.println("捕获到异常:" + ex.getMessage());
                return "备用结果:操作失败,使用默认值。";
            })
            .thenAccept(result -> {
                System.out.println("处理后的结果:" + result);
            }).join();

        System.out.println("主线程结束。");
    }
}

如果 supplyAsync 中的操作抛出异常,exceptionally() 会捕获它,并提供一个替代值,从而让后续的 thenAccept 继续执行。

  • handle()

handle() 方法类似于 thenApply,但它在异步操作完成(无论是正常完成还是异常完成)时都会被调用。它接收两个参数:结果和异常。如果发生异常,结果为 null;如果没有异常,异常为 null

代码语言:java
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;

public class ExceptionHandlingHandle {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("模拟一个可能失败的异步操作...");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            if (ThreadLocalRandom.current().nextBoolean()) {
                return "操作成功的结果";
            } else {
                throw new RuntimeException("模拟的业务异常:网络连接中断");
            }
        });

        future
            .handle((result, ex) -> {
                if (ex != null) {
                    System.err.println("在 handle 中捕获到异常:" + ex.getMessage());
                    return "错误处理结果:" + ex.getMessage();
                } else {
                    System.out.println("在 handle 中处理成功结果:" + result);
                    return "成功处理结果:" + result;
                }
            })
            .thenAccept(finalResult -> {
                System.out.println("最终消费结果:" + finalResult);
            }).join();

        System.out.println("主线程结束。");
    }
}

handle() 的优点是它允许你在一个地方同时处理成功和失败的情况。

  • whenComplete()

whenComplete() 方法也类似于 handle(),它在 CompletableFuture 完成时(正常或异常)被调用。但它不修改 CompletableFuture 的结果,而是执行一个副作用操作。

代码语言:java
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;

public class ExceptionHandlingWhenComplete {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("模拟一个可能失败的异步操作...");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            if (ThreadLocalRandom.current().nextBoolean()) {
                return "原始的成功结果";
            } else {
                throw new RuntimeException("模拟的业务异常:服务不可用");
            }
        });

        future
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    System.err.println("在 whenComplete 中发现异常:" + ex.getMessage());
                    // 这里可以记录日志、发送通知等,但不会改变 future 的结果
                } else {
                    System.out.println("在 whenComplete 中发现成功结果:" + result);
                }
            })
            .exceptionally(ex -> { // whenComplete 不会消费异常,异常会继续向下传递
                System.err.println("在 exceptionally 中捕获到异常(来自原始 future):" + ex.getMessage());
                return "备用结果因为原始操作失败";
            })
            .thenAccept(finalResult -> {
                System.out.println("最终消费结果:" + finalResult);
            }).join();

        System.out.println("主线程结束。");
    }
}

流式异常处理(异常传播)

CompletableFuture 链中的异常会沿着链式调用的方向自动传播。这意味着,如果链中的某一步发生异常,后续的所有 thenApply, thenAccept 等操作都不会执行,除非你使用了 exceptionally()handle() 来捕获并处理异常。

代码语言:java
复制
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class ChainedExceptionHandler {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("步骤1:开始...");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            // 模拟在第一步抛出异常
            if (true) { // 总是抛出异常
                 throw new RuntimeException("步骤1:发生错误!");
            }
            return "数据A";
        });

        CompletableFuture<String> step2 = step1.thenApply(dataA -> {
            System.out.println("步骤2:接收到 " + dataA + ",开始处理...");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return dataA + " -> 数据B";
        });

        CompletableFuture<String> step3 = step2.thenApply(dataB -> {
            System.out.println("步骤3:接收到 " + dataB + ",开始处理...");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return dataB + " -> 数据C";
        });

        step3
            .exceptionally(ex -> {
                System.err.println("在链末端捕获到异常:" + ex.getMessage());
                return "错误!链式操作中止。";
            })
            .thenAccept(finalResult -> {
                System.out.println("最终结果:" + finalResult);
            }).join();

        System.out.println("主线程结束。");
    }
}

在上面的例子中,由于 step1 抛出了异常,step2step3thenApply 都不会执行。异常会直接传播到链末端的 exceptionally() 方法。

检查性异常 (Checked Exceptions)

CompletableFuture 本身处理异常的方式是将其包装在 CompletionExceptionExecutionException 中。这意味着,如果你在 supplyAsyncrunAsync 中执行的代码抛出了检查性异常,你需要自己捕获并将其包装成非检查性异常(例如 RuntimeException)。

这是因为函数式接口(如 SupplierRunnable)的抽象方法签名中没有声明检查性异常,所以你不能直接在 lambda 表达式中抛出它们。

代码语言:java
复制
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;

public class CheckedExceptionHandling {

    // 模拟一个会抛出检查性异常的方法
    public static String readFileContent() throws IOException {
        System.out.println("正在模拟读取文件...");
        if (System.currentTimeMillis() % 2 == 0) { // 模拟文件不存在或权限问题
            throw new IOException("文件不存在或无法访问!");
        }
        return "文件内容:Hello CompletableFuture!";
    }

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始...");

        CompletableFuture<String> fileReadFuture = CompletableFuture.supplyAsync(() -> {
            try {
                return readFileContent(); // 调用可能抛出检查性异常的方法
            } catch (IOException e) {
                // 必须在这里捕获检查性异常,并将其包装为非检查性异常
                // 否则编译会失败
                throw new RuntimeException("读取文件时发生错误: " + e.getMessage(), e);
            }
        });

        fileReadFuture
            .exceptionally(ex -> {
                Throwable cause = ex.getCause(); // 获取原始异常
                if (cause instanceof IOException) {
                    System.err.println("捕获到 IOException:" + cause.getMessage());
                } else {
                    System.err.println("捕获到其他异常:" + ex.getMessage());
                }
                return "文件读取失败,使用默认内容。";
            })
            .thenAccept(content -> {
                System.out.println("处理后的文件内容:" + content);
            }).join();

        // 另一种直接处理异常的方式是使用 get() 或 join(),但它们会抛出 ExecutionException/CompletionException
        try {
            // 注意:这里需要 try-catch ExecutionException
            String result = fileReadFuture.get();
            System.out.println("通过 get() 获取的结果:" + result);
        } catch (ExecutionException e) {
            System.err.println("通过 get() 捕获到 ExecutionException:" + e.getCause().getMessage());
        }


        System.out.println("主线程结束。");
    }
}

总结处理检查性异常的两种方式:

  1. 在 lambda 内部 try-catch 并包装成 RuntimeException: 这是推荐的方式,因为它可以让 CompletableFuture 链继续使用 exceptionally() 等进行统一的异常处理。
  2. 在调用 get()join() 时捕获 ExecutionException / CompletionException: 这种方式会阻塞当前线程,并且你需要在调用处显式处理这些非检查性异常。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 并发
  • 并行
  • 区别
  • 四种任务处理方式(纯并发 / 并发 - 并行 / 并行 - 并发 / 纯并行)
  • “抽象泄露”:看似简单的工具,藏着一堆麻烦
  • 纯函数式语言也救不了?
  • 并发的意义
  • 并发的超能力
    • 并发的适用场景与影响因素
    • 并发的问题与挑战
    • 实现并发的方法
  • java并发的四句格言
  • 并行流(Parallel Streams)
  • 创建与运行
    • 为什么不用 Thread 而用 ExecutorService?
    • 最常用的两种线程池
      • 1. 单线程池(SingleThreadExecutor)
      • 2. 缓存线程池(CachedThreadPool)
    • 任务的两种类型:Runnable 和 Callable
      • 用单线程池执行 Runnable 任务(顺序执行)
      • 用缓存线程池执行 Callable 任务(并行计算)
    • 注意事项
  • 如何优雅地终止耗时的并发任务
    • 为啥需要终止任务?
    • 别用 “硬中断”,容易出乱子
    • 推荐方法:给任务设个 “停止信号”
    • 用 AtomicBoolean 保证信号靠谱
  • CompletableFuture类
    • 什么是 CompletableFuture?
    • 创建CompletableFuture
    • 获取结果
    • 结合 CompletableFuture
    • 链式调用
    • 组合多个 CompletableFuture
    • 模拟耗时操作
    • 异常处理
    • 流式异常处理(异常传播)
    • 检查性异常 (Checked Exceptions)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档