专栏首页个人开发聊聊Java中CompletableFuture的使用

聊聊Java中CompletableFuture的使用

多任务并行协作

任务串行执行

结果组合运算

thenCombine和thenCompose

thenAcceptBoth和runAfterBoth

acceptEither、runAfterEither

java9的改进

总结

CompletableFuture是java8引入的一个异步类,它最大的优势是可以在创建的对象中传入一个回调对象,在任务结束后(done或throw exception),自动调用回调对象的回调方法,而不用让主线程阻塞。

多任务并行协作

假如我们要做咖啡,有3个子任务可以并行执行:洗杯子、磨咖啡、烧水,这3步完成后,我们开始泡咖啡。这种需求我们一般怎么实现呢?

下面我们看一下,使用Future是怎么完成这个功能的。

先定义一个线程池:

public class MyThreadPoolExecutor {
    public static ExecutorService getThreadPoolExecutor(){
        return ThreadPoolExecutorFactory.THREAD_POOL;
    }

    private static class ThreadPoolExecutorFactory{
        private static int PROCESSOR_NUM = Runtime.getRuntime().availableProcessors();
        private static final ExecutorService THREAD_POOL = new ThreadPoolExecutor(PROCESSOR_NUM, PROCESSOR_NUM + 1, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100));
    }
}

我们再来看看使用Future制作咖啡的过程,我们把每个步骤都用一个线程来执行,必须等待前面三步都执行完成后,我们才能开始泡咖啡

public class MakeTea {
    public static void main(String[] args){
        ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
        List<Future> list = new ArrayList<>(3);
        list.add(executor.submit(() -> washCup()));
        list.add(executor.submit(() -> hotWater()));
        list.add(executor.submit(() -> grindCoffee()));
        while (true){
            int i = 0;
            for (Future future : list){
                if (future.isDone()){
                    i ++;
                }
            }
            if (i == list.size()){
                break;
            }
        }
        executor.submit(() -> System.out.println("泡咖啡"));
    System.out.println("我是主线程");
    }

    private static String washCup(){
        System.out.println("洗杯子");
        return "洗杯子";
    }

    private static String hotWater(){
        System.out.println("烧水");
        return "烧水";
    }

    private static String grindCoffee(){
        System.out.println("磨咖啡");
        return "磨咖啡";
    }
}

上面的代码创建3个线程后,因为等待执行结果,阻塞了主线程,等3个步骤都执行完成,主线程才能执行。输出如下:

洗杯子
烧水
磨咖啡
我是主线程
泡咖啡

如果我们使用CompletableFuture来写,要怎么实现呢?代码如下:

public static void main(String[] args){
    ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
    CompletableFuture future1 = CompletableFuture.runAsync(() -> {
        try {
            washCup();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture future2 = CompletableFuture.runAsync(() -> {
        try {
            hotWater();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture future3 = CompletableFuture.runAsync(() -> {
        try {
            grindCoffee();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture.allOf(future1, future2, future3).thenAccept(
            r -> {
                System.out.println("泡咖啡");
            }
    );
    System.out.println("我是主线程");
}

上面代码输出结果如下,可以看到主线程并没有被阻塞

我是主线程
洗杯子
烧水
磨咖啡
泡咖啡

上面的示例是多个任务之间的调度,最后一个任务必须等之前的3个任务都完成后(allOf),才能执行。如果前面3个任务只有一个完成最后一个任务就可以执行,那就用anyOf方法,把上面代码中allOf改成anyOf,其他代码不变,执行结果如下:

我是主线程
洗杯子
泡咖啡
烧水
磨咖啡

注意:

1.anyOf方法返回的是Object对象而不是Void,这是跟allOf的一个很大的区别,我们要配置异常情况的回调对象,在allOf创建的CompletableFuture中是不可以的。看下面代码

CompletableFuture<Object> future4 = CompletableFuture.anyOf(future1, future2, future3);
    future4.thenApply(
            r -> {
                System.out.println("泡咖啡");
                return null;
            }
    );
    future4.exceptionally(e -> {
        e.printStackTrace();
        return null;
});

2.supplyAsync和runAsync区别是前者创建的是CompletableFuture<U>,后者创建的是CompletableFuture<Void>

任务串行执行

CompletableFuture也支持任务串行执行,后面的任务依赖前面任务的执行结果。我们再举一个做果汁的例子,我们分串行三步: 洗水果 —> 切水果 -> 榨汁,看如下代码

public static void main(String[] args) throws InterruptedException {
    ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> washFruit(), executor);
    CompletableFuture<String> future2 = future1.thenApplyAsync(r -> StringUtils.isBlank(r) ? null : cutFruit());
    future2.thenApplyAsync(r -> StringUtils.isBlank(r) ? null : juicing());
    //为了主线程不立刻退出,以便查看结果
    Thread.sleep(100);
}

private static String washFruit() {
    System.out.println(Thread.currentThread().getName());
    System.out.println("洗水果");
    return "洗水果";
}

private static String cutFruit() {
    System.out.println(Thread.currentThread().getName());
    System.out.println("切水果");
    return "切水果";
}

private static String juicing() {
    System.out.println(Thread.currentThread().getName());
    System.out.println("榨汁");
    return "榨汁";
}

上面main函数执行后输出:

pool-1-thread-1
洗水果
ForkJoinPool.commonPool-worker-1
切水果
ForkJoinPool.commonPool-worker-1
榨汁

上面的代码就是一个串行执行的任务,这儿除了thenApply,还有thenAccept(Consumer<? super T> action)和thenRun(Runnable action),这2个方法都不返回执行结果。

注意:上面方法中,thenApply、thenAccept、thenRun都有一个对应的Async方法,区别在于Async方法会从线程池中拿线程执行,而不带Async的方法在当前线程执行。所以如果上面代码中thenApplyAsync换成thenApply,执行结果如下:

pool-1-thread-1
洗水果
pool-1-thread-1
切水果
pool-1-thread-1
榨汁

结果组合运算

thenCombine和thenCompose

thenCombine用于组合2个CompletableFuture,对结果进行运算,下面2段代码都是输出110

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100, executor);
CompletableFuture<Integer> future3 = future1.thenCombineAsync(future2, (x, y) -> x + y);
System.out.println(future3.get());

thenCompose把第一个CompletableFuture的结果放到第二个CompletableFuture中进行运算

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor);
CompletableFuture<Integer> future2 = future1.thenCompose(r -> CompletableFuture.supplyAsync(() -> r + 100));
System.out.println(future2.get());

上面的组合方法其实用上一节讲的串行执行也可以完成,见如下代码

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor);
CompletableFuture<Integer> future2 = future1.thenApplyAsync(r -> r + 100);
System.out.println(future2.get());

thenAcceptBoth和runAfterBoth

thenAcceptBoth用于对前面2个线程的结果进行组合运算,下面代码输出110

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10, executor);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100, executor);
CompletableFuture<Void> future3 = future1.thenAcceptBoth(future2, (x, y) -> System.out.println(x + y));

runAfterBoth用于等待前面2个线程之后执行第三个线程

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("线程1"), executor);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println("线程2"), executor);
CompletableFuture<Void> future3 = future1.runAfterBothAsync(future2, () -> System.out.println("线程3"));

上面代码输出:

线程1
线程2
线程3

acceptEither、runAfterEither和applyToEither

这三个方法只取组合线程中执行最快的一个结果,看下面代码:

public static void main(String[] args) {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()-> getTask1());
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> getTask2());
    CompletableFuture<String> f3 = f1.applyToEither(f2,s -> s);
    System.out.println(f3.join());//输出task2

    CompletableFuture<String> f4 = CompletableFuture.supplyAsync(()-> getTask1());
    CompletableFuture<String> f5 = CompletableFuture.supplyAsync(() -> getTask2());
    CompletableFuture<Void> f6 = f4.runAfterEither(f5, () -> System.out.println("task3"));
    f6.join();//输出task3

    CompletableFuture<String> f7 = CompletableFuture.supplyAsync(()-> getTask1());
    CompletableFuture<String> f8 = CompletableFuture.supplyAsync(() -> getTask2());
    CompletableFuture<Void> f9 = f8.acceptEither(f7, s -> System.out.println(s));
    f9.join();//输出task2
  
}

private static String getTask1(){
    try {
        Thread.currentThread().sleep(2000);
        return "task1";
    } catch (InterruptedException e) {
        e.printStackTrace();
        return null;
    }
}

private static String getTask2(){
    try {
        Thread.currentThread().sleep(1000);
        return "task2";
    } catch (InterruptedException e) {
        e.printStackTrace();
        return null;
    }
}

同样注意:上面7个方法都存在对应的Async方法,会从线程池中取线程来执行。

java9的改进

1.可以设置超时时间,超时后给一个默认值,比如下面代码输出100

ExecutorService executor = MyThreadPoolExecutor.getThreadPoolExecutor();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> getNum(), executor);
future1.completeOnTimeout(100, 3000, TimeUnit.MILLISECONDS);
System.out.println(future1.get());

2.增加failedFuture跟之前的completedFuture配对,前者创建一个指定异常的CompletableFuture,后者创建一个指定给定值的CompletableFuture。

public static <U> CompletableFuture<U> failedFuture(Throwable ex)
public static <U> CompletableFuture<U> completedFuture(U ,value)

3.增加了completedStage和failedStage,这2个方法返回CompletableFuture的继承类MinimalStage

public static <U> CompletionStage<U> completedStage(U value)
public static <U> CompletionStage<U> failedStage(Throwable ex)

4.增加了defaultExecutor和newIncompleteFuture,可以让子类自己去实现

public Executor defaultExecutor()
public <U> CompletableFuture<U> newIncompleteFuture()

总结

CompletableFuture类对多线程调度的支持还是挺强大的,本文主要介绍了一些常用的方法,对于其他方法,大家可以查看api或者CompletionStage接口中定义的方法选择使用。

本文分享自微信公众号 - jinjunzhu(gh_1f109b82d301),作者:jinjunzhu

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-06-30

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • redis灵魂拷问:聊一聊AOF日志重写

    redis的AOF日志,是redis持久化的一种方式,它是一种write after log,即先执行命令后记录日志。这样的好处是日志不会记录执行失败的命令,同...

    jinjunzhu
  • go语言学习(三):源码文件

    命令源码文件是程序执行的入口,属于main包,包含无参数无返回结果的main函数,同java类似,同一个包下面不建议存放多个main函数。命令源码文件编译或安装...

    jinjunzhu
  • redis灵魂拷问:聊一聊主从复制缓冲区

    在我之前《redis灵魂拷问:怎样搭建一个哨兵主从集群》搭建的集群主从哨兵集群,有1个主节点和2个从节点环境如下表:

    jinjunzhu
  • 超赞,给你推荐20个使用 Java CompletableFuture的例子!

    这篇文章介绍 Java 8 的 CompletionStage API和它的标准库的实现 CompletableFuture。API通过例子的方式演示了它的行为...

    业余草
  • 关于CompletableFuture的一切,看这篇文章就够了

    之前的文章中,我们讲解了Future, 本文我们将会继续讲解java 8中引入的CompletableFuture的用法。

    程序那些事
  • 基础篇:异步编程不会?我教你啊!CompeletableFuture

    以前需要异步执行一个任务时,一般是用Thread或者线程池Executor去创建。如果需要返回值,则是调用Executor.submit获取Future。但是多...

    潜行前行
  • Java后端开发三年多线程你都懂,问你异步编程你说你没听过???

    以前需要异步执行一个任务时,一般是用Thread或者线程池Executor去创建。如果需要返回值,则是调用Executor.submit获取Future。但是多...

    Java程序猿
  • 《玩家一号》彩蛋解析丨DC、回到未来、光环统统有,宅们的狂欢!

    VRPinea
  • SqlServer查看死锁的存储过程

    跟着阿笨一起玩NET
  • 为什么ABAP整型的1转成string之后,后面会多个空格

    版权声明:本文为博主汪子熙原创文章,未经博主允许不得转载。 https://jerry.bl...

    Jerry Wang

扫码关注云+社区

领取腾讯云代金券