前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >有了Future为什么还要CompletableFuture?

有了Future为什么还要CompletableFuture?

作者头像
cheese
发布2024-02-06 08:18:07
840
发布2024-02-06 08:18:07
举报
文章被收录于专栏:Java PorterJava Porter

Future 接口理论知识复习

Future 接口概述

  • Future 接口(FutureTask 实现类)定义了异步任务执行的一些方法
    • 获取异步任务执行的结果
    • 取消异步任务执行
    • 判断任务是否被取消,
    • 判断任务执行是否完毕等

场景描述

  • 主线程让一个子线程去执行任务,子线程可能比较耗时,如果没有实现异步任务执行,主线程只能一直等待
  • Future 接口支持了异步任务执行之后,子线程开始执行任务的同时,主线程继续执行自身任务,等到主线程或者子线程任务完成之后,主线程才会获取子线程任务执行结果
  • 上课买水案例…

小结

Future 接口可为主线程开启一个分支任务,专门为主线程处理耗时和废力的复杂业务

Future 接口常用实现类 FutureTask 异步任务

  • Future 接口是 Java5 新增的一个接口,提供了一种异步并行计算的功能
    • 若主线程需要执行一些很耗时的计算任务,可以通过 future 把该任务放到异步线程中去执行
    • 主线程继续处理其他任务或者先行结束,再通过 Future 获取计算结果

Future 的作用

  • 异步多线程任务执行且返回有结果,三个特点
    • 多线程
    • 有返回
    • 异步任务
  • 为什么是 Future?

Futrue 编码测试

代码语言:javascript
复制
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new MyThread());

        Thread t1 = new Thread(futureTask, "t1");
        t1.start();
        System.out.println(futureTask.get());
    }
}

class MyThread implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("-----come in call() ");
        return "hello Callable";
    }
}
image.png
image.png

优缺点分析

优点
  • futrue+线程池异步多线程任务配合,能显著提高代码的执行效率
  • 串型执行
代码语言:javascript
复制
 //3个任务,目前只有一个线程main来处理,请问耗时多少?

        long startTime = System.currentTimeMillis();
        //暂停毫秒
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");

        System.out.println(Thread.currentThread().getName() + "\t -----end");
image.png
image.png
  • 使用 futureTask+线程池异步多线程任务
代码语言:javascript
复制
     ExecutorService threadPool = Executors.newFixedThreadPool(3);

        long startTime = System.currentTimeMillis();

        FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task1 over";
        });
        threadPool.submit(futureTask1);

        FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task2 over";
        });
        threadPool.submit(futureTask2);

        System.out.println(futureTask1.get());
        System.out.println(futureTask2.get());

        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");


        System.out.println(Thread.currentThread().getName() + "\t -----end");
        threadPool.shutdown();
image.png
image.png
缺点
  • get 的获取容易阻塞
代码语言:javascript
复制
         FutureTask<String> futureTask = new FutureTask<String>(() -> {
            System.out.println(Thread.currentThread().getName() + "\t -----come in");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "task over";
        });
        Thread t1 = new Thread(futureTask, "t1");
        t1.start();

        System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");

        System.out.println(futureTask.get());
image.png
image.png
  • get容易导致阻塞,一般建议放在程序后面,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,容易程序堵塞。
代码语言:javascript
复制
        System.out.println(futureTask.get());
        System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");
image.png
image.png
  • 假如我不愿意等待很长时间,我希望过时不候,可以自动离开.
代码语言:javascript
复制
        System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");
        System.out.println(futureTask.get(3,TimeUnit.SECONDS));
  • 超过 3 秒结束线程抛出 TimeOutException
image.png
image.png
  • 轮询耗费 CPU
代码语言:javascript
复制
        while (true) {
            if (futureTask.isDone()) {//futureTask执行完成
                System.out.println(futureTask.get());
                break;
            } else {
                //暂停毫秒,未完成,持续等待
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("正在处理中,不要再催了,越催越慢 ,再催熄火");
            }
        }
image.png
image.png
小结

  • get(),一旦调用 get 方法求结果,如果计算没有完成,容易导致线程阻塞
  • isDone()轮询
    • 轮询的方式会消耗无谓的 CPU 资源,而且也不见得能及时得到计算结果
    • 如果想要异步获取,通常都会以轮询的方式去获取结果,尽量不使用阻塞
  • Future 对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到结果

面对一些复杂的任务

对于简单的业务场景使用 Future 接口完全 OK
回调通知
  • 应对 Future 的完成时间,完成之后发起回调通知
  • 通过轮询方式去判断任务是否完成,非常不优雅,也占用 CPU
创建异步任务
  • Future+线程池配合
多个任务前后依赖可以组合
  • 若想将多个异步任务的计算结果组合起来,则后一个异步任务的计算结果,需要前一个异步任务的值
  • 将两个或多个异步计算合成一个异步计算,这几个异步计算,互相独立,同时后面这个又依赖于前一个处理的结果
对计算速度选最快
  • 当 Future 集合中某个任务最快结束时,返回结果,返回第一名处理结果
CompletableFuture 应运而生
  • 使用 Future 接口提供的 API,处理不够优雅
image.png
image.png
  • CompletableFuture 以声明式方式优雅的处理这些需求同时规避 Future 自身获取计算结果的弊端

CompletableFuture 对 Future 的改进

CompletableFuture 为什么会出现?

  • get()方法在 Future 计算完成之前会一直处于阻塞状态下
  • isDone()方法容易耗费 CPU 资源
  • 对于真正在异步处理中我们希望可以通过传入回调函数,在 Future 结束时自动回调该函数,这样就不需要等待结果
  • 阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的 CPU 资源
  • 因此,JDK8 设计出 CompletableFuture
    • CompletableFuture 提供了一种与观察者模式类似的机制,可以让任务执行完成后通知监听的一方

CompletableFuture 与 CompletionStage 源码

类继承架构
代码语言:javascript
复制
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{}
image.png
image.png
接口 CompletionStage
  • CompletionStage 代表异步计算过程中的某个阶段,一个阶段完成以后会触发另一个阶段(类似于 Linux 管道分隔符传参数)
  • 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable
代码语言:javascript
复制
//示例如下
stage
.thenApply(x->square(x))
.thenAccept(x->System.out.print(x))
.thenRun(()->System.out.println())
  • 一个阶段的执行可能被单个阶段的完成触发,也可能有多个阶段一起触发
类 CompletionFuture
  • Java8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,提供了转化和组合 CompletionFuture 的方法
  • 它可能代表了一个明确完成 Future,也可能代表一个完成阶段 CompletionStage,它支持在计算完成之后触发一些函数或执行某些动作
  • 实现了 Future 和 CompletionStage 接口

核心的四个静态方法,创建一个异步任务

  • 为什么要不用 new CompletionFuture()方式创建异步任务
image.png
image.png
  • API 中说明通过 new CompletionFuture()方式会创建一个不完备的 CompletionFuture,官方也不推荐使用该方式
runAsync 方法—无返回值
  • public static CompletableFuture<Void> runAsync(Runnable runnable)
    • 使用默认的 ForkJoinPool 线程池
代码语言:javascript
复制
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println(completableFuture.get());
image.png
image.png
  • public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
    • 使用定义的线程池对象
代码语言:javascript
复制
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },threadPool);

        System.out.println(completableFuture.get());
        threadPool.shutdown();
image.png
image.png
supplyAsync 方法—有返回值
  • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
代码语言:javascript
复制
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello supplyAsync";
        });
        System.out.println(completableFuture.get());
image.png
image.png
  • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
代码语言:javascript
复制
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello supplyAsync";
        },threadPool);
        System.out.println(completableFuture.get());

        threadPool.shutdown();
image.png
image.png
关于参数 Executor 说明
  • 没有指定 Executor 的方法,直接默认使用 ForkJoinPool.commonPool()作为线程池,作为它的线程池执行异步代码
  • 若指定线程池,则使用自定义或者特别定义的线程池执行异步代码
减少阻塞和轮询
  • 从 Java8 开始引入了 CompletableFuture,它是 Future 的功能增强版,减少阻塞和轮询,
  • 可以传入回调对象,当异步任务完成或者发生异常时,自动回调对象的回调方法
  • 使用 CompletableFuture 实现 Future 的功能
代码语言:javascript
复制
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "----come in");
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("-----1秒钟后出结果:" + result);
            return result;
        });

        System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");

        System.out.println(completableFuture.get());
image.png
image.png
  • CompletableFuture.supplyAsync()完成异步编程返回结果
代码语言:javascript
复制
try {
            CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "----come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("-----1秒钟后出结果:" + result);
                return result;
            })
            .whenComplete((v, e) -> {//v为上述计算完成的result,e为异常
                if (e == null) { //没有异常
                    System.out.println("-----计算完成,更新系统UpdateValue:" + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
                return null;
            });

            System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
image.png
image.png
  • 解释下为什么默认线程池关闭,自定义线程池记得关闭?
  • 用户线程中,程序执行完成需要 1 秒钟,main 线程执行太快,在 ForkJoinPool 线程池中若发现 main 线程执行完成则会关闭线程池
  • 解决方法
    • 将 main 线程延迟 1 秒,在用户线程的 try/catch/finally 代码之后添加睡眠代码
代码语言:javascript
复制
        //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
image.png
image.png
  • 或是使用定义的线程对象,或是自定义线程对象
代码语言:javascript
复制
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        try {
            CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "----come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("-----1秒钟后出结果:" + result);
                return result;
            },threadPool).whenComplete((v, e) -> {
                if (e == null) {
                    System.out.println("-----计算完成,更新系统UpdateValue:" + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
                return null;
            });

            System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
image.png
image.png
  • 编写异常代码测试
image.png
image.png
image.png
image.png
CompletableFuture 的优点
  • 异步任务结束时,会自动调用对象的方法
  • 主线程设置好回调之后,不在关系异步任务的执行,异步任务之间可以顺序进行
  • 异步任务出错时,会自动调用某个对象的方法
代码语言:javascript
复制
        try {

            //调用异步任务,传入线程池对象
            asyncTask(threadPool);

            System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }

        //...主线程

         void asyncTask(ExecutorService threadPool) {
                     //...业务的逻辑
                    return result;
                }, threadPool).whenComplete((v, e) -> {
                    //回调接口
                    callInterface(v, e);
                }).exceptionally(e -> {
                    //异常接口
                    e.printStackTrace();
                    exceptionHandel(e);
                    return null;
                });
            }

电商网站比价需求案例

函数式编程已经主流

  • 大厂面试题
image.png
image.png

Lambda 表达式+Stream 流式应用+Chain 链式调用+Java8 函数式编程

Runnable
  • 无参数,无返回值
代码语言:javascript
复制
package java.lang;

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
Function
  • Function<T,R>接受一个参数,并且有返回值
代码语言:javascript
复制
@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
}
Consumer
  • Consumer 接受一个参数,没有返回值
代码语言:javascript
复制
@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}
  • BiConsumer<T,U>接受两个参数,没有返回值
代码语言:javascript
复制
@FunctionalInterface
public interface BiConsumer<T, U> {
    void accept(T t, U u);
}
  • 在回调 CompletableFuture.whenComplete 方法中进行调用
image.png
image.png
Supplier
  • 供给型函数式接口,没有参数,有一个返回值
代码语言:javascript
复制
@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}
Summer
image.png
image.png

先说说 join 和 get 的对比

  • get 在程序编译时会检查异常,join 在程序编译时不会检查异常,此外于 get 基本等价
代码语言:javascript
复制
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            return "join and get";
        });
        System.out.println(supplyAsync.join());
image.png
image.png
  • 说说你过去工作中的项目亮点!!!(面试必备)

大厂业务需求说明

  • 切记,先完成功能再到性能的逐步迭代
  • 电商网站比价需求分析
代码语言:javascript
复制
案例说明:电商比价需求,模拟如下情况:
   1. 需求:
     1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
     1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
      
   2. 输出:出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>
       《mysql》 in jd price is 88.05
       《mysql》 in dangdang price is 86.11
       《mysql》 in taobao price is 90.43

      
  3. 解决方案,对别同一个商品在各个平台上的价格,要求获得一个清单列表
    3.1. step by step,按部就班查完jd,查taobao,查完taobao查天猫.....
    3.2. all in ,使用多线程,异步任务执行同时查询多个平台

     
  4. 技术要求
       3.1 函数式编程
       3.2 链式编程
       3.3 Stream流式计算

Java8 函数式编程在 Case 中的应用

  • 创建资源
    • 电商网站类
代码语言:javascript
复制
//电商网站类
class NetMall {
    /**
     * 电商网站名 jd,pdd taobao...
     */
    @Getter
    private String netMallName;

    /**
     * 构造方法
     * @param netMallName
     */
    public NetMall(String netMallName) {
        this.netMallName = netMallName;
    }

    /**
     * 售价
     * @param productName
     * @return
     */
    public double calcPrice(String productName) {
        try {
            //查询需要1秒钟
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //模拟价格
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}
  • 查询电商网站
代码语言:javascript
复制
    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao"),
            new NetMall("pdd"),
            new NetMall("tmall")
    );
方案一,step by step
  • 使用流式计算,查询返回结果
代码语言:javascript
复制
 /**
     * step by step 一家家搜查
     * List<NetMall> ----->map------> List<String>
     *
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPrice(List<NetMall> list, String productName) {
        //《mysql》 in taobao price is 90.43
        return list
                .stream()//流式计算
                .map(netMall -> //映射为map集合
                        //字符串格式化
                        String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }
  • 测试计算结果
代码语言:javascript
复制
 public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        for (String element : list1) {
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");
 }
image.png
image.png
方案二,asyncExecutor
  • 基于CompletableFuture.supplyAsync
代码语言:javascript
复制
/**
     * List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
     *
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {
        return list
                .stream()
                .map(netMall ->
                        CompletableFuture.supplyAsync(() -> 
                                String.format(productName + " in %s price is %.2f",
                                    netMall.getNetMallName(),
                                    netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(s -> s.join())
                .collect(Collectors.toList());
    }
  • 两次流式映射
image.png
image.png
代码语言:javascript
复制
public static void main(String[] args) {
        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByCompletableFuture(list, "mysql");
        for (String element : list2) {
            System.out.println(element);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime2 - startTime2) + " 毫秒");
}
效果比较
image.png
image.png

CompletableFuture 的常用方法

获得结果和触发计算

get()
代码语言:javascript
复制
 /**
     * 获得结果和触发计算
     *
     * @throws InterruptedException
     * @throws ExecutionException
     */
    private static void group1() throws InterruptedException, ExecutionException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "abc";
        });

        System.out.println(completableFuture.get());
    }
image.png
image.png
get(long time,TimeUnit unit)
代码语言:javascript
复制
System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));
image.png
image.png
join()
代码语言:javascript
复制
System.out.println(completableFuture.join());
image.png
image.png
getNow(String valueIfAbsent)
代码语言:javascript
复制
System.out.println(completableFuture.getNow("xxx"));
image.png
image.png
  • 源码解读
    • 当调用 getNow 时,计算完成,获取计算结果
    • 当调用 getNow 时,计算未完成,返回备选值(valueIfabsent)
image.png
image.png
  • 异步任务执行 1s,主线程 2s,在 ,异步任务在 2s 内执行完成,返回结果给 getNow
代码语言:javascript
复制
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "abc";
        });

        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }

        System.out.println(completableFuture.getNow("xxx"));
image.png
image.png
complete(T value)
代码语言:javascript
复制
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try {
                //执行2s
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "abc";
        });

        //执行1s
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }

        System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.join());
image.png
image.png
  • 源码说明
image.png
image.png
  • 主线程调用异步任务时
    • 计算未完成,返回 true,同时将 value 作为 result 给到主线程
    • 计算完成,返回 false,同时将异步任务的计算结果给到主线程
  • 将异步任务与主线程的睡眠时间互换,得到以下结果
image.png
image.png

对计算结果进行处理

  • 此处 supplyAsync 若不使用指定线程池,主线程执行完会直接结束 jvm
thenApply
  • 计算结果存在依赖关系,这两个线程串行化
  • demo
代码语言:javascript
复制
private static void thenApply1(ExecutorService threadPool) {
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1;
        }, threadPool).thenApply(f -> {
            System.out.println("222");
            return f + 2;
        }).thenApply(f -> {
            System.out.println("333");
            return f;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("----计算结果: " + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });

        System.out.println(Thread.currentThread().getName() + "----主线程先去忙其它任务");
        threadPool.shutdown();
    }
image.png
image.png
  • f=1+2=3
  • 异常处理
    • 计算过程中出现异常,thenApply(),会直接终止计算
image.png
image.png
image.png
image.png
handle
  • 计算结果存在依赖关系,这两个线程串行化
  • demo
代码语言:javascript
复制
private static void handle1(ExecutorService threadPool) {
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1;
        }, threadPool).handle((f, e) -> {
            System.out.println("222");
            return f + 2;
        }).handle((f, e) -> {
            System.out.println("333");
            return f + 3;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("----计算结果: " + v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });

        System.out.println(Thread.currentThread().getName() + "----主线程先去忙其它任务");

        threadPool.shutdown();
    }
image.png
image.png
  • 异常处理
    • 有异常时,跳过异常代码,带着异常参数继续执行后续代码
image.png
image.png
image.png
image.png
Summary
image.png
image.png

对计算结果进行消费

  • 接受任务的处理结果,并消费处理,无返回结果
demo
  • 源码解读
image.png
image.png
  • 调用了 Consumer 接口,传入参数无返回值
代码语言:javascript
复制
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }).thenApply(f ->{
            return f + 2;
        }).thenApply(f ->{
            return f + 3;
        }).thenAccept(System.out::println);
    }
image.png
image.png
补充
  • thenRun 不调用前置计算的结果
image.png
image.png
  • thenAccpet 获取前置计算结果,最终不返回记过,consumer 直接消费
image.png
image.png
  • thenApply,获取前置计算结果,最终返回所有计算结果
image.png
image.png
CompletableFuture 和线程池说明
  • 以 thenRun 和 thenRunAsync 为例
代码语言:javascript
复制
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        try {
            CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("1号任务" + "\t" + Thread.currentThread().getName());
                return "abcd";
            }, threadPool).thenRunAsync(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("2号任务" + "\t" + Thread.currentThread().getName());
            }).thenRun(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("3号任务" + "\t" + Thread.currentThread().getName());
            }).thenRun(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("4号任务" + "\t" + Thread.currentThread().getName());
            });
            System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
image.png
image.png
  • 将所有的方法都统一为 thenRun
image.png
image.png
  • 将入口睡眠代码注释
image.png
image.png
  • 结论
    • 没有传入自定义线程池,都默认使用 ForkJoinPool
    • 传入一个指定线程池之后
      • 执行第一个任务时,传入指定线程池
        • 调用 thenRun 方法执行第二个任务时,则第一个任务和第二个任务共用同一个线程池
        • 调用 thenRunAsync 执行第二个任务时,则第一个任务用指定线程池,第二个任务用 ForkJoinPool
    • 有可能处理太快,系统优化切换原则直接使用 main 线程处理
    • 其它 thenAccept 与 thenAccpetAsync,thenApply 和 thenApplyAsync 等,之间的区别亦是同理
  • 源码解读
image.png
image.png
image.png
image.png
  • 调用 thenXxxxAsync 方法默认都会调用一个 ForkJoinPool.commonPool()

对计算速度进行选用

  • 谁快用谁
  • applyToEither
代码语言:javascript
复制
    public static void main(String[] args) {
        //开启两个异步任务
        CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            System.out.println("A come in");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playA";
        });

        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            System.out.println("B come in");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playB";
        });

        //比较两个异步任务,返回率先完成计算的异步任务的结果
        CompletableFuture<String> result = playA.applyToEither(playB, f -> {
            return f + " is winer";
        });

        System.out.println(Thread.currentThread().getName() + "\t" + "-----: " + result.join());
    }
image.png
image.png
1707042003737.png
1707042003737.png

对计算结果进行合并

  • 两个 CompletionStage 任务都完成后,最终能把两个任务的结果一起交给 thenCombine 进行处理
  • 先完成的先等待,所有分支完成后执行 thenCombine
  • 拆分方式
代码语言:javascript
复制
  public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture1 
              = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---启动");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 
              = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---启动");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 20;
        });

        CompletableFuture<Integer> result 
              = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println("-----开始两个结果合并");
            return x + y;
        });

        System.out.println(result.join());
    }
image.png
image.png
  • 函数式接口方式
代码语言:javascript
复制
private static void interfaceChain() {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t"+"come in 1");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName() + "\t"+"come in 2");
            return 20;
        }),(x,y)->{
            System.out.println(Thread.currentThread().getName() + "\t"+"x + y = a =" +(x+y));
            return x + y ;
        }).thenCombine(CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName() + "\t"+"come in 3");
            return 30;
        }),(a,b)->{
            System.out.println(Thread.currentThread().getName() + "\t"+"a + b = " +(a+b));
            return a+b;
        });
        System.out.println("---主线程结束,END");
        System.out.println(thenCombineResult.join());
    }
image.png
image.png
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-02-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Future 接口理论知识复习
    • Future 接口概述
      • 场景描述
        • 小结
        • Future 接口常用实现类 FutureTask 异步任务
          • Future 的作用
            • Futrue 编码测试
              • 优缺点分析
                • 优点
                • 缺点
                • 小结
              • 面对一些复杂的任务
                • 对于简单的业务场景使用 Future 接口完全 OK
                • 回调通知
                • 创建异步任务
                • 多个任务前后依赖可以组合
                • 对计算速度选最快
                • CompletableFuture 应运而生
            • CompletableFuture 对 Future 的改进
              • CompletableFuture 为什么会出现?
                • CompletableFuture 与 CompletionStage 源码
                  • 类继承架构
                  • 接口 CompletionStage
                  • 类 CompletionFuture
                • 核心的四个静态方法,创建一个异步任务
                  • runAsync 方法—无返回值
                  • supplyAsync 方法—有返回值
                  • 关于参数 Executor 说明
                  • 减少阻塞和轮询
                  • CompletableFuture 的优点
              • 电商网站比价需求案例
                • 函数式编程已经主流
                  • Lambda 表达式+Stream 流式应用+Chain 链式调用+Java8 函数式编程
                    • Runnable
                    • Function
                    • Consumer
                    • Supplier
                    • Summer
                  • 先说说 join 和 get 的对比
                    • 大厂业务需求说明
                      • Java8 函数式编程在 Case 中的应用
                        • 方案一,step by step
                        • 方案二,asyncExecutor
                        • 效果比较
                    • CompletableFuture 的常用方法
                      • 获得结果和触发计算
                        • get()
                        • get(long time,TimeUnit unit)
                        • join()
                        • getNow(String valueIfAbsent)
                        • complete(T value)
                      • 对计算结果进行处理
                        • thenApply
                        • handle
                        • Summary
                      • 对计算结果进行消费
                        • demo
                        • 补充
                        • CompletableFuture 和线程池说明
                      • 对计算速度进行选用
                        • 对计算结果进行合并
                        相关产品与服务
                        流计算 Oceanus
                        流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档