可以把 “处理器” 想象成 “干活的人”,任务就是要做的活儿:
简单说,就是 “本以为能用一个简单的工具搞定复杂问题,结果工具底下的细节全冒出来了,躲都躲不掉”。
比如,你想用一个 “并发编程库” 写程序,以为它会帮你处理好所有底层细节(比如 CPU 怎么分工、数据怎么同步),结果写着写着就发现:
这些底层细节(CPU 怎么工作、数据怎么传递)本来该被工具 “藏起来”,结果全漏出来捣乱,这就是 “抽象泄露”。并发 / 并行编程几乎都这样,这不是Java特有的 - 这是并发和并行编程的本质。
有人觉得 “纯函数式语言”(比如 Haskell)能避免这些问题,因为它的代码逻辑更严谨,不容易出数据混乱。
确实,它能解决很多并发问题,但也不是万能的。比如你用它写了一个 “消息队列”(比如处理用户订单的排队系统):
最后你会发现:不管用什么语言,都得把底层细节(队列大小、处理速度、硬件限制)摸透,不然系统迟早出问题。并发 / 并行编程的本质就是这样 —— 看似是 “同时干多件事”,实则处处是坑,必须精打细算。
并发性是一系列性能技术,专注于减少等待
1.不要这样做
2.没有什么是真的,一切可能都有问题
上述涉及的主题非常复杂,本文无法解释,自行了解这些内容(并建议参考《Java Concurrency in Practice》)。
3.它起作用,并不意味着它没有问题
隐蔽性问题的典型例子是竞态条件(Race Condition) 导致的偶发错误,这类问题在常规测试中可能难以显现,却会在高并发的极端场景下暴露,以下是一个具体案例:
假设有一个简单的计数器程序,多个线程同时对一个共享变量count
进行自增操作(count++
)。从表面上看,这段代码逻辑清晰,在单线程环境下完全正常,但在多线程并发时,却可能因为操作的非原子性出现问题。
count++
的底层操作可拆解为三步:
count
的值;count
。当两个线程同时执行这一操作时,可能出现以下极端情况:
count
的值为 10;count
的值也为 10;count
;count
。原本预期两次自增后count
应为 12,但实际结果却是 11,这就是典型的竞态条件导致的错误。这种问题在线程数量少、执行频率低时(如测试环境)很难触发,一旦部署到高并发场景(如大量用户同时操作),就会频繁出现数据不一致的问题,且由于错误具有偶发性,排查难度极大。
这个例子直观体现了并发程序中 “看似正常却存在隐蔽问题” 的特点 —— 代码在表面上没有语法错误,常规测试也可能表现正常,但在极端并发条件下会暴露深层缺陷,进而引发用户可见的问题(如数据错误、逻辑异常等)。
这种问题 “难触发” 的核心原因在于:错误是否出现取决于线程执行的 “时间窗口” 是否恰好重合,而在低并发场景下,这个窗口出现的概率极低。
以计数器例子具体说明:
count
值(即 “时间窗口重合”)的概率非常小。多数情况下,线程 A 会完整执行 “读 - 加 - 写” 三步后,线程 B 才开始操作,结果会是正确的 12。而在高并发场景下(比如 100 个线程同时高频操作),线程间的执行顺序会变得极度混乱,“时间窗口重合” 的概率会急剧升高,错误就会频繁暴露。
因此,这类问题的隐蔽性就在于:它不是 “必然出错”,而是 “可能出错”,在低并发测试中容易被漏检,直到高并发环境才会集中爆发。
4.你仍然必须理解它
并行流的好处:
parallel()
就能让流并行处理,利用多处理器加快速度。比如找质数的例子里,用了parallel()
后,速度差不多是不用时的 3 倍,确实很划算。parallel () 不是万能的:
range()
生成序列再并行求和,速度很快;但如果用iterate()
生成序列,再加parallel()
,不仅可能更慢,数据量大了还会耗光内存。parallel () 和 limit () 一起用容易出问题:
Stream.generate()
生成序列,加parallel()
和limit(10)
,本来想要 10 个数,结果内部可能生成 1024 个,还可能出现随机顺序的结果。这是因为并行时多个线程会预取很多值,最后只取前 10 个,新手容易踩坑。总结:并行流看起来简单,加个parallel()
就行,但其实有很多门道。不能随便乱加,得先弄明白并行到底能不能帮上忙,最好先测试再用。记住 “先让程序跑起来,再考虑提速,而且只在必须的时候才优化”。
写并发代码,几乎不会直接用new Thread()
,而是用ExecutorService
(线程池),原因很简单:
里根据是否需要返回结果,选择不同的任务类型:
类型 | 特点 | 场景 |
---|---|---|
Runnable | 无返回值,用run()方法 | 只需要执行动作(比如打印日志) |
Callable | 有返回值,用call()方法 | 需要计算结果(比如统计订单金额) |
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
// 1. 定义任务(实现Runnable,无返回值)
class PrintTask implements Runnable {
private int id;
public PrintTask(int id) {
this.id = id;
}
@Override
public void run() {
// 模拟任务执行(比如处理数据)
try {
Thread.sleep(100); // 休眠100ms,模拟耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务" + id + "完成,线程:" + Thread.currentThread().getName());
}
}
public class SingleThreadDemo {
public static void main(String[] args) {
// 2. 创建单线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 3. 提交10个任务
IntStream.range(0, 10).forEach(i -> executor.execute(new PrintTask(i)));
// 4. 关闭线程池(重要:用完必须关,否则程序不会结束)
executor.shutdown();
}
}
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
// 1. 定义有返回值的任务(实现Callable)
class SumTask implements Callable<Integer> {
private int start;
private int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public Integer call() {
int sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println("计算" + start + "-" + end + ",线程:" + Thread.currentThread().getName());
return sum;
}
}
public class CachedThreadPoolDemo {
public static void main(String[] args) throws Exception {
// 2. 创建缓存线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 3. 提交3个计算任务(分别计算1-100,101-200,201-300的和)
List<SumTask> tasks = IntStream.range(0, 3)
.mapToObj(i -> new SumTask(i * 100 + 1, (i + 1) * 100))
.collect(Collectors.toList());
// 4. 执行所有任务并获取结果(invokeAll会等待所有任务完成)
List<Future<Integer>> results = executor.invokeAll(tasks);
// 5. 汇总结果
int total = 0;
for (Future<Integer> result : results) {
total += result.get(); // get()会获取返回值,若未完成则等待
}
System.out.println("总和:" + total); // 结果应为45150
// 6. 关闭线程池
executor.shutdown();
}
}
shutdown()
(等待任务完成后关闭),否则程序会一直运行。val
)会出问题,尽量让任务独立(像 Demo2 的 Callable,每个任务自己计算,不共享数据)。get()
会 “等结果”,可能影响效率,企业里现在更推荐用 Java8 的CompletableFuture
(后续会讲)。AtomicBoolean
(原子布尔值),不能用普通的boolean
。因为多个线程同时改一个普通布尔值可能出问题(比如两个线程同时点 “停止”,结果信号没传对),而AtomicBoolean
能防这种混乱。简单来说,CompletableFuture
实现了 Future
和 CompletionStage
接口。CompletableFuture 本质上代表了一个将来会完成的异步操作。 你可以把它想象成一个“占位符”或者“承诺”,它承诺在未来某个不确定的时间点会给你一个结果。这个结果可能是成功的值,也可能是操作中出现的异常。
它既可以作为生产者(提供结果),也可以作为消费者(处理结果)。
CompletableFuture
对象。这个对象就代表了未来这个任务的执行结果。当任务完成时,它会将结果“放入”这个 CompletableFuture
中。CompletableFuture
上。当生产者完成并将结果放入时,这些回调函数就会被自动触发执行,从而处理这个结果。异步的核心在于非阻塞。虽然消费者最终确实需要生产者的结果才能继续处理,但它们不必同步地等待。以下几点体现了 CompletableFuture
的异步性:
CompletableFuture
启动一个异步任务时,例如 supplyAsync()
或 runAsync()
,程序会立即得到一个 CompletableFuture
对象,而不会停下来等待任务执行完成。你的主线程可以继续执行其他操作,而不会被阻塞。CompletableFuture
提供了丰富的API,允许你将多个异步操作串联起来(链式操作),或者将它们并行执行,从而构建复杂的异步流程。例如,你可以说“当任务A完成后,就去执行任务B;如果任务B也完成了,就去执行任务C”。所有这些步骤都可以在不阻塞主线程的情况下进行。supplyAsync()
: 如果你的异步操作会返回一个结果,可以使用 supplyAsync()
。它接受一个 Supplier
函数式接口。import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class BasicUsage {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
// 模拟一个耗时操作,返回一个字符串
Supplier<String> supplier = () -> {
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 4)); // 模拟1-3秒延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "异步操作的结果";
};
CompletableFuture<String> future = CompletableFuture.supplyAsync(supplier);
// 注册回调函数,当异步操作完成时执行
future.thenAccept(result -> {
System.out.println("异步操作完成,结果是:" + result);
});
System.out.println("主线程继续执行,不等待异步操作...");
// 为了演示,让主线程等待一下异步操作完成
future.join(); // 阻塞当前线程直到CompletableFuture完成并获取结果
// 也可以使用 future.get(); 但它会抛出检查性异常
// future.get(5, TimeUnit.SECONDS); // 可以设置超时
System.out.println("主线程结束。");
}
}
在这个例子中,supplyAsync()
会在一个新的线程中执行 supplier
提供的逻辑。thenAccept()
用于注册一个回调,当异步操作完成并获得结果时,这个回调就会被执行。join()
方法会阻塞当前线程直到 CompletableFuture 完成并返回结果。
runAsync()
: 如果你的异步操作不返回任何结果(类似 void
方法),可以使用 runAsync()
。它接受一个 Runnable
函数式接口。import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class RunAsyncExample {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2); // 模拟2秒延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
System.out.println("异步操作完成,没有返回结果。");
});
System.out.println("主线程任务不需要等待,执行主线的任务。");
future.join(); // 等待异步操作完成
System.out.println("主线程结束。");
}
}
join()
: 阻塞当前线程,直到 CompletableFuture 完成并返回结果。如果 CompletableFuture 异常完成,它会抛出一个 CompletionException。get()
: 阻塞当前线程,直到 CompletableFuture 完成并返回结果。与 join()
不同,get()
会抛出 InterruptedException 和 ExecutionException(一个检查性异常,包装了实际的异常)。thenAccept
, thenApply
, thenRun
等): 这是非阻塞获取结果的方式。你可以在 CompletableFuture 完成时注册一个操作。
thenAccept(Consumer<? super T> action)
: 消费结果,不返回新的 CompletableFuture。thenApply(Function<? super T, ? extends U> fn)
: 将结果转换成另一种类型,并返回一个新的 CompletableFuture。thenRun(Runnable action)
: 不关心结果,只在 CompletableFuture 完成时执行一个操作。CompletableFuture 的强大之处在于它允许你将多个异步操作组合在一起,形成一个复杂的异步工作流。
thenApply
, thenAccept
, thenCompose
等方法允许你链式调用异步操作。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;
public class CombineCompletableFutures {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
CompletableFuture<String> fetchUserData = CompletableFuture.supplyAsync(() -> {
System.out.println("第一步:正在获取用户数据...");
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 3));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "用户ID:123, 名称:张三";
});
CompletableFuture<Double> calculateDiscount = fetchUserData.thenApplyAsync(userData -> {
System.out.println("第二步:根据用户数据计算折扣...");
// 假设从用户数据中提取ID并计算折扣
String userId = userData.split(",")[0].split(":")[1];
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 3));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return Double.parseDouble(userId) * 0.01; // 模拟折扣
});
CompletableFuture<String> applyDiscountAndGenerateInvoice = calculateDiscount.thenApplyAsync(discount -> {
System.out.println("第三步:应用折扣并生成发票...");
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 3));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "发票已生成,折扣为:" + String.format("%.2f", discount * 100) + "%";
});
applyDiscountAndGenerateInvoice.thenAccept(invoice -> {
System.out.println("最终结果:" + invoice);
}).join(); // 等待所有链式操作完成
System.out.println("主线程结束。");
}
}
有时你需要等待多个独立的 CompletableFuture 都完成后再进行下一步操作。
allOf()
: 等待所有给定的 CompletableFuture 完成。它返回一个 CompletableFuture<Void>
。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class AllOfExample {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "任务1完成";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "任务2完成";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "任务3完成";
});
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
allTasks.thenRun(() -> {
try {
// 当所有任务都完成后,才能安全地获取它们的结果
System.out.println(task1.join());
System.out.println(task2.join());
System.out.println(task3.join());
System.out.println("所有任务都已完成!");
} catch (Exception e) {
System.err.println("有任务异常终止: " + e.getMessage());
}
}).join();
System.out.println("主线程结束。");
}
}
anyOf()
: 任何一个给定的 CompletableFuture 完成时就完成。它返回一个 CompletableFuture<Object>
,其结果是第一个完成的 CompletableFuture 的结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;
public class AnyOfExample {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
CompletableFuture<String> fastTask = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 3));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "这是最快的任务!";
});
CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(4, 6));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "这个任务有点慢。";
});
CompletableFuture<Object> anyTask = CompletableFuture.anyOf(fastTask, slowTask);
anyTask.thenAccept(result -> {
System.out.println("最先完成的任务结果是: " + result);
}).join();
System.out.println("主线程结束。");
}
}
在实际开发中,CompletableFuture 经常用于模拟网络请求、数据库操作、文件读写等耗时任务。通过 TimeUnit.SECONDS.sleep()
是一个简单有效的模拟方式。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;
public class SimulateLongRunningOperation {
// 模拟从数据库获取用户信息的服务
public static CompletableFuture<String> getUserInfo(String userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("正在从数据库获取用户 " + userId + " 的信息...");
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 4)); // 模拟DB查询延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "用户信息: {id: " + userId + ", name: John Doe}";
});
}
// 模拟调用第三方API获取产品价格的服务
public static CompletableFuture<Double> getProductPrice(String productId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("正在从第三方API获取产品 " + productId + " 的价格...");
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(2, 5)); // 模拟API调用延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return 99.99; // 模拟产品价格
});
}
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
// 并行获取用户信息和产品价格
CompletableFuture<String> userInfoFuture = getUserInfo("abc");
CompletableFuture<Double> productPriceFuture = getProductPrice("xyz");
// 当两者都完成后,组合结果
CompletableFuture<String> combinedFuture = userInfoFuture.thenCombine(productPriceFuture, (userInfo, price) -> {
System.out.println("所有异步数据获取完成,正在组合信息...");
return "用户数据: [" + userInfo + "], 产品价格: [" + price + "]";
});
// 打印最终结果
combinedFuture.thenAccept(finalResult -> {
System.out.println("最终报告: " + finalResult);
}).join();
System.out.println("主线程结束。");
}
}
thenCombine()
的核心功能是将当前流(Flow A)与另一个流(Flow B)进行合并。当两个流都产生了元素时,它会使用一个指定的函数对这些元素进行组合,最终生成一个包含组合结果的新流。
CompletableFuture<String> userInfoFuture =getUserInfo("abc");执行后会直接放回一个CompletableFuture<String> 对象,并不会等到内部操作执行完后主线程才进行下一步操作。
在异步编程中,异常处理至关重要。CompletableFuture 提供了几种优雅的异常处理机制。
exceptionally()
exceptionally()
当 CompletableFuture 异常完成时,exceptionally()
会被调用,你可以提供一个备用结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;
public class ExceptionHandlingExceptionally {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
CompletableFuture<String> futureWithException = CompletableFuture.supplyAsync(() -> {
System.out.println("模拟一个可能失败的异步操作...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
if (ThreadLocalRandom.current().nextBoolean()) {
System.out.println("操作成功。");
return "成功结果";
} else {
System.err.println("操作失败,抛出异常!");
throw new RuntimeException("模拟的业务异常:数据获取失败");
}
});
futureWithException
.exceptionally(ex -> {
System.err.println("捕获到异常:" + ex.getMessage());
return "备用结果:操作失败,使用默认值。";
})
.thenAccept(result -> {
System.out.println("处理后的结果:" + result);
}).join();
System.out.println("主线程结束。");
}
}
如果 supplyAsync
中的操作抛出异常,exceptionally()
会捕获它,并提供一个替代值,从而让后续的 thenAccept
继续执行。
handle()
handle()
方法类似于 thenApply
,但它在异步操作完成(无论是正常完成还是异常完成)时都会被调用。它接收两个参数:结果和异常。如果发生异常,结果为 null
;如果没有异常,异常为 null
。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;
public class ExceptionHandlingHandle {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("模拟一个可能失败的异步操作...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
if (ThreadLocalRandom.current().nextBoolean()) {
return "操作成功的结果";
} else {
throw new RuntimeException("模拟的业务异常:网络连接中断");
}
});
future
.handle((result, ex) -> {
if (ex != null) {
System.err.println("在 handle 中捕获到异常:" + ex.getMessage());
return "错误处理结果:" + ex.getMessage();
} else {
System.out.println("在 handle 中处理成功结果:" + result);
return "成功处理结果:" + result;
}
})
.thenAccept(finalResult -> {
System.out.println("最终消费结果:" + finalResult);
}).join();
System.out.println("主线程结束。");
}
}
handle()
的优点是它允许你在一个地方同时处理成功和失败的情况。
whenComplete()
whenComplete()
方法也类似于 handle()
,它在 CompletableFuture 完成时(正常或异常)被调用。但它不修改 CompletableFuture 的结果,而是执行一个副作用操作。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;
public class ExceptionHandlingWhenComplete {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("模拟一个可能失败的异步操作...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
if (ThreadLocalRandom.current().nextBoolean()) {
return "原始的成功结果";
} else {
throw new RuntimeException("模拟的业务异常:服务不可用");
}
});
future
.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("在 whenComplete 中发现异常:" + ex.getMessage());
// 这里可以记录日志、发送通知等,但不会改变 future 的结果
} else {
System.out.println("在 whenComplete 中发现成功结果:" + result);
}
})
.exceptionally(ex -> { // whenComplete 不会消费异常,异常会继续向下传递
System.err.println("在 exceptionally 中捕获到异常(来自原始 future):" + ex.getMessage());
return "备用结果因为原始操作失败";
})
.thenAccept(finalResult -> {
System.out.println("最终消费结果:" + finalResult);
}).join();
System.out.println("主线程结束。");
}
}
CompletableFuture 链中的异常会沿着链式调用的方向自动传播。这意味着,如果链中的某一步发生异常,后续的所有 thenApply
, thenAccept
等操作都不会执行,除非你使用了 exceptionally()
或 handle()
来捕获并处理异常。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class ChainedExceptionHandler {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
System.out.println("步骤1:开始...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
// 模拟在第一步抛出异常
if (true) { // 总是抛出异常
throw new RuntimeException("步骤1:发生错误!");
}
return "数据A";
});
CompletableFuture<String> step2 = step1.thenApply(dataA -> {
System.out.println("步骤2:接收到 " + dataA + ",开始处理...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return dataA + " -> 数据B";
});
CompletableFuture<String> step3 = step2.thenApply(dataB -> {
System.out.println("步骤3:接收到 " + dataB + ",开始处理...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return dataB + " -> 数据C";
});
step3
.exceptionally(ex -> {
System.err.println("在链末端捕获到异常:" + ex.getMessage());
return "错误!链式操作中止。";
})
.thenAccept(finalResult -> {
System.out.println("最终结果:" + finalResult);
}).join();
System.out.println("主线程结束。");
}
}
在上面的例子中,由于 step1
抛出了异常,step2
和 step3
的 thenApply
都不会执行。异常会直接传播到链末端的 exceptionally()
方法。
CompletableFuture 本身处理异常的方式是将其包装在 CompletionException
或 ExecutionException
中。这意味着,如果你在 supplyAsync
或 runAsync
中执行的代码抛出了检查性异常,你需要自己捕获并将其包装成非检查性异常(例如 RuntimeException
)。
这是因为函数式接口(如 Supplier
和 Runnable
)的抽象方法签名中没有声明检查性异常,所以你不能直接在 lambda 表达式中抛出它们。
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
public class CheckedExceptionHandling {
// 模拟一个会抛出检查性异常的方法
public static String readFileContent() throws IOException {
System.out.println("正在模拟读取文件...");
if (System.currentTimeMillis() % 2 == 0) { // 模拟文件不存在或权限问题
throw new IOException("文件不存在或无法访问!");
}
return "文件内容:Hello CompletableFuture!";
}
public static void main(String[] args) throws Exception {
System.out.println("主线程开始...");
CompletableFuture<String> fileReadFuture = CompletableFuture.supplyAsync(() -> {
try {
return readFileContent(); // 调用可能抛出检查性异常的方法
} catch (IOException e) {
// 必须在这里捕获检查性异常,并将其包装为非检查性异常
// 否则编译会失败
throw new RuntimeException("读取文件时发生错误: " + e.getMessage(), e);
}
});
fileReadFuture
.exceptionally(ex -> {
Throwable cause = ex.getCause(); // 获取原始异常
if (cause instanceof IOException) {
System.err.println("捕获到 IOException:" + cause.getMessage());
} else {
System.err.println("捕获到其他异常:" + ex.getMessage());
}
return "文件读取失败,使用默认内容。";
})
.thenAccept(content -> {
System.out.println("处理后的文件内容:" + content);
}).join();
// 另一种直接处理异常的方式是使用 get() 或 join(),但它们会抛出 ExecutionException/CompletionException
try {
// 注意:这里需要 try-catch ExecutionException
String result = fileReadFuture.get();
System.out.println("通过 get() 获取的结果:" + result);
} catch (ExecutionException e) {
System.err.println("通过 get() 捕获到 ExecutionException:" + e.getCause().getMessage());
}
System.out.println("主线程结束。");
}
}
总结处理检查性异常的两种方式:
try-catch
并包装成 RuntimeException
: 这是推荐的方式,因为它可以让 CompletableFuture 链继续使用 exceptionally()
等进行统一的异常处理。get()
或 join()
时捕获 ExecutionException
/ CompletionException
: 这种方式会阻塞当前线程,并且你需要在调用处显式处理这些非检查性异常。原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。