前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用 CompletableFuture 实现并发短路

利用 CompletableFuture 实现并发短路

原创
作者头像
Im ZT
修改2024-10-03 09:11:32
1100
修改2024-10-03 09:11:32

原文地址:Java多线程:逻辑表达式的短路运算 原文进阶:Java多线程:复杂逻辑表达式的短路运算

一、问题背景

在复杂的业务逻辑处理中,我们经常需要同时处理多个并发任务,比如检查多个条件是否满足,例如:A && B && C && ...。如果一个条件不满足(例如 A 返回了 false,则可以得出最终结果为 false),就没有必要继续等待其他线程执行结束了。问题是,如何在这种情况下快速返回想要的结果?

通过 Java 提供的 CompletableFuture 工具,我们可以实现这一目标。本文将介绍如何利用 CompletableFutureanyOfallOf 方法,快速处理并发任务,满足我们的需求。

二、CompletableFuture 简介

CompletableFuture 是 Java 8 引入的一个强大异步编程工具,它提供了丰富的组合操作,支持多任务并发处理和异步计算。

在某些场景下,我们希望任务在结果不满足某个条件时能尽早终止,而不是等待所有任务结束。然而,CompletableFuture.allOf() 默认要求所有任务都执行完成,无法处理提前返回的情况,而 anyOf() 则会在第一个任务完成后立即返回结果(无论 true 或者 false)。这就要求我们结合两者,设计出更灵活的解决方案。

补充:本文用到了CompletableFuture 在 Java 9 中引入了 orTimeout() 方法,可以方便地处理异步任务的超时问题。如果你还在使用 Java 8,可以自行封装类似的超时机制。

三、方案设计

目标是并发处理多个任务,当任意一个任务返回 false 时,立即终止其他任务并返回结果。我们可以通过 allOf 来并行处理多个任务,同时结合 anyOf 来在短路条件满足时立刻返回结果。

代码结构设计

  • 组合 anyOf 和 allOf:使用 allOf 并发执行所有任务,一旦某个任务返回 false,通过终止线程 terminationFuture 触发 anyOf 机制,提前结束未完成的任务。如果所有任务都返回 true,则正常结束。
  • 原子类 AtomicBoolean:一个线程安全的布尔值,用来存储逻辑表达式的最终结果。
  • 终止线程 terminationFuture:当任意任务返回 false 时,调用 complete() 方法,提前触发 anyOf 机制。

在这个实现中,一旦某个任务返回 false,其他任务将被忽略并提前返回,避免了不必要的资源消耗。以下是代码示例:

代码语言:javascript
复制
// 超时时间
public static final long TIMEOUT = 1000L;
// 条件 A 、B 、C ... 实现
List<Predicate<ExpressionFacts>> expressionList = new ArrayList<>();
// 为了方便演示,使用 newFixedThreadPool 方法,进行线程池创建
ExecutorService executor = Executors.newFixedThreadPool(10);

/**
 * 实现 A && B && C && ... 快速失败
 *
 * @param facts 方法入参
 * @return A && B && C && ... 的最终结果
 */
public boolean evaluateAnd(ExpressionFacts facts) {
    // result 为该方法返回的最终结果,使用 Atomic 以保证变量的原子性操作
    AtomicBoolean result = new AtomicBoolean(true);
    // 终止线程,用来根据不同任务的执行结果决定是否要提前返回
    CompletableFuture<Void> terminationFuture = new CompletableFuture<>();

    CompletableFuture.anyOf(terminationFuture, CompletableFuture.allOf(expressionList.stream()
            .map(expression -> CompletableFuture.runAsync(() -> {
                        if (!result.get()) {
                            return; // 如果已经有规则不符合,直接返回
                        }
                        boolean ruleResult = expression.test(facts); // 不同任务执行的结果
                        if (!ruleResult && result.compareAndSet(true, false)) {
                            terminationFuture.complete(null); // 完成终止线程,提前返回多线程结果
                        }
                    }, executor)
                    .orTimeout(TIMEOUT, TimeUnit.MILLISECONDS) // JDK 9 引入的异步超时处理方法
                    .exceptionally(ex -> {
                        // 捕获异常,结果算作 false
                        if (result.compareAndSet(true, false)) {
                            terminationFuture.complete(null);
                        }
                        return null;
                    })).toArray(CompletableFuture[]::new))).join();

    return result.get();
}

/**
 * 实现 A || B || C || ... 快速成功
 *
 * @param facts 方法入参
 * @return A || B || C || ... 的最终结果
 */
public boolean evaluateOr(ExpressionFacts facts) {
    AtomicBoolean result = new AtomicBoolean(false);
    CompletableFuture<Void> terminationFuture = new CompletableFuture<>();

    CompletableFuture.anyOf(terminationFuture, CompletableFuture.allOf(expressionList.stream()
            .map(expression -> CompletableFuture.runAsync(() -> {
                        if (result.get()) {
                            return; // 如果已经有规则符合,直接返回
                        }
                        boolean ruleResult = expression.test(facts);
                        if (ruleResult && result.compareAndSet(false, true)) {
                            terminationFuture.complete(null); // 完成终止线程
                        }
                    }, executor)
                    .orTimeout(TIMEOUT, TimeUnit.MILLISECONDS)
                    .exceptionally(ex -> {
                        return null;
                    })).toArray(CompletableFuture[]::new))).join();

    return result.get();
}

四、优缺点分析

优点:

  • 高效性:通过并发处理和短路机制,一旦某个任务不满足条件,可以立即返回结果,避免了等待其他任务执行完毕,从而提高了系统的响应效率。
  • 任务不中断:即使短路发生,未完成的任务仍会继续执行,避免了线程被突然中断带来的问题,保持了系统的健壮性。

缺点:

  • 线程资源开销:由于引入了额外的终止线程,这会占用一些系统资源。

五、总结

本文通过结合 CompletableFuture.anyOfallOf 方法,展示了如何处理多任务并发,并通过短路机制实现高效的逻辑求值。例如:A && B && C && ... 以及 A || B || C || ...,并且在满足所有条件的前提下快速响应任务失败/成功。

六、展望

对于更复杂的逻辑表达式,例如 A && (B || C) && !D,虽然我们可以通过 Java 的逻辑运算符来实现短路,但这种操作是顺序执行的,并不能最大化利用多线程的优势。假如 D 条件先执行完毕并且结果为 true,仍需等待 A && (B || C) 执行完,这种方式显然无法充分利用并行计算的潜力。以下是代码示例:

代码语言:javascript
复制
public static void main(String[] args) {
    boolean A = true;  // 可以是条件的执行逻辑,这里简单用结果 true/false 代替
    boolean B = false;
    boolean C = true;
    boolean D = false;

    boolean result = A && (B || C) && !D;
    System.out.println("结果: " + result);
}

是否可以通过多线程并发处理,可以将复杂表达式拆解为多个独立任务并行执行,同时利用短路机制在条件满足时快速返回。

下一章 :我们将深入探讨如何处理更加复杂的逻辑表达式,例如 A && (B || C) && !D 等,并进一步结合多线程优化处理流程,实现更高效的计算逻辑。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、问题背景
  • 二、CompletableFuture 简介
  • 三、方案设计
    • 代码结构设计
    • 四、优缺点分析
    • 五、总结
    • 六、展望
    相关产品与服务
    GPU 云服务器
    GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于生成式AI,自动驾驶,深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档