前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >手把手教学妹CompletableFuture异步化,性能关系直接起飞!

手把手教学妹CompletableFuture异步化,性能关系直接起飞!

作者头像
JavaEdge
发布2021-04-19 16:57:23
1.3K0
发布2021-04-19 16:57:23
举报
文章被收录于专栏:JavaEdge

Guava 的冲击

由于 JDK1.5 Futrure 的 get 方法获取任务结果必须阻塞等待,Google 看不下去了,开发了 Guava 库

代码语言:javascript
复制
 public static void main(String[] args) throws Exception {
        // 装饰器模式
        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));

        ListenableFuture<String> future = service.submit(new MyCallable());

        // 异步非阻塞:观察者模式  当ListenableFuture完成时,需要执行的程序
        Futures.addCallback(future, new FutureCallback<String>() {
            @Override
            public void onSuccess(@Nullable String s) {

            }

            @Override
            public void onFailure(Throwable throwable) {
                throwable.printStackTrace();
            }
        }, service);

        log.info("do something in main");
        Thread.sleep(1000);
        String result = future.get();
        log.info("result:{}", result);
    }

于是 java 开始慌了,jdk8 赶紧出招,于是直接抄个类似的即可。

用多线程优化性能,其实不过就是将串行操作变成并行操作。在串行转换成并行的过程中,一定会涉及到异步化,如下

代码语言:javascript
复制
// 以下两个方法都是耗时操作
doBizA();
doBizB();

现在是串行的,

为了提升性能,得把它们并行化,那具体实施起来该怎么做呢? 如下,创建俩子线程去执行即可。

代码语言:javascript
复制
new Thread(()->doBizA()).start(); 
new Thread(()->doBizB()).start();

主线程无需等待doBizA、doBizB的执行结果,即doBizA()和doBizB()两个操作是异步的。

异步化,是并行方案得以实施的基础,利用多线程优化性能这个核心方案得以实施的基础。所以异步编程很重要,因为优化性能是大厂的核心需求。JDK8提供CompletableFuture支持异步编程。

CompletableFuture的核心优势

3个任务:

  • 任务1:洗水壶、烧开水
  • 任务2负责洗茶壶、洗茶杯和拿茶叶
  • 任务3负责泡茶 任务3要等待任务1和任务2都完成后才能开始。

下面是代码实现会发现:无需手工维护线程,无需关注给任务分配线程

代码语言:javascript
复制
//任务1:洗水壶->烧开水
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1:洗水壶...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T1:烧开水...");
  sleep(15, TimeUnit.SECONDS);
});
//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2:洗茶壶...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T2:洗茶杯...");
  sleep(2, TimeUnit.SECONDS);

  System.out.println("T2:拿茶叶...");
  sleep(1, TimeUnit.SECONDS);
  return "龙井";
});

//任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1:拿到茶叶:" + tf);
    System.out.println("T1:泡茶...");
    return "上茶:" + tf;
  });
//等待任务3执行结果
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
代码语言:javascript
复制
// 一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井

创建CompletableFuture对象

创建CompletableFuture对象主要靠下面代码中展示的这4个

静态方法

头两个使用默认线程池。

runAsync(Runnable runnable)

Runnable 接口的run()方法没有返回值

supplyAsync(Supplier supplier)

Supplier接口的get()方法有返回值

CompletableFuture默认使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism设置ForkJoinPool线程池的线程数)。

若所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的I/O操作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,推荐根据不同业务类型创建不同的线程池,以避免互相干扰。下两个方法可指定线程池

runAsync(Runnable runnable, Executor executor)

supplyAsync(Supplier supplier, Executor executor)

创建完CompletableFuture对象后,会自动异步执行runnable.run()或者supplier.get(),对一个异步操作,我们关注:

  • 异步操作什么时候结束
  • 如何获取异步操作的执行结果

因为CompletableFuture类实现了Future接口,所以这些都是通过Future接口解决的。 CompletableFuture类还实现了CompletionStage接口

CompletionStage接口

任务有时序关系,比如

  • 串行 比如烧水泡茶,其中洗水壶和烧开水
  • 并行 洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间
  • 汇聚 烧开水、拿茶叶这俩任务和泡茶就是汇聚

CompletionStage接口可清晰描述任务之间的这种时序关系,例如

代码语言:javascript
复制
f3 = f1.thenCombine(f2, ()->{}) 

描述的就是一种汇聚关系。烧水泡茶中的汇聚关系是一种

  • AND 聚合关系 AND指所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)

还有

  • OR聚合关系 OR指的是依赖的任务只要有一个完成就可以执行当前任务。

1 串行关系

CompletionStage接口里面描述串行关系,主要是thenApply、thenAccept、thenRun和thenCompose这四个系列的接口。

thenApply系

fn的类型是接口Function,这个接口里与CompletionStage相关的方法是 R apply(T t)

该方法既能接收参数也支持返回值,所以thenApply系列方法返回的是CompletionStage

thenAccept系

参数consumer的类型是接口Consumer,这个接口里与CompletionStage相关的方法是 void accept(T t)

该方法虽然支持参数,但不支持返回值,所以thenAccept系方法返回值是CompletionStage。

thenRun系

参数是Runnable,所以action既不能接收参数也不支持返回值,所以thenRun系列方法返回的也是CompletionStage

Async表示异步执行fn、consumer或action。

thenCompose系

这个系列的方法会新创建出一个子流程,最终结果和thenApply系相同。

看如何使用thenApply()。 supplyAsync()启动一个异步流程,之后是两个串行操作。虽然这是一个异步流程,但任务1、2、3是串行执行,即2依赖1的执行结果,3依赖2的执行结果。

2 AND汇聚

主要是thenCombine、thenAcceptBoth和runAfterBoth系接口

3 OR汇聚

主要是applyToEither、acceptEither和runAfterEither系接口

代码语言:javascript
复制
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

如何使用applyToEither()描述OR汇聚关系。

代码语言:javascript
复制
CompletableFuture<String> f1 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f3 = 
  f1.applyToEither(f2,s -> s);

System.out.println(f3.join());

CompletableFuture 中各种关系(并行、串行、聚合),支持的各种场景。 比如:线程A 等待线程B或线程C等待线程A、B 。

其实CountdownLatch、ThreadPoolExecutor 和Future 就是来解决这些关系场景的,现在有了 completableFuture,可以优先考虑使用 CompletableFuture。

4 异常处理

fn、consumer、action的核心方法都不允许抛受检异常,但无法限制它们抛运行时异常,例如下面的代码,执行 1/0 就会出现除0错误的运行时异常 非异步编程里,可以用try/catch捕获并处理异常,异步编程里该如何处理呢?

CompletionStage给出的方案很简单,使用这些方法处理异常和串行操作一样的,而且还支持链式编程。

代码语言:javascript
复制
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
  • exceptionally()类似try/catch中的catch
  • whenComplete()和handle()类似try/finally的finally,无论是否发生异常都会执行whenComplete()中的回调方法consumer和handle()中的回调方法fn whenComplete()不支持返回结果,handle()支持返回结果。

学了这么多,最后来看个例子:

代码语言:javascript
复制
//采购订单
PurchersOrder po;
CompletableFuture<Boolean> cf = 
  CompletableFuture.supplyAsync(()->{
    // 在MySQL中查询规则
    return findRuleByJdbc();
  }).thenApply(r -> {
    // 规则校验
    return check(po, r);
});
Boolean isOk = cf.join();

如上代码问题在于:

  • 读数据库属于I/O操作,应定制单独的线程池,避免线程饥饿
  • 查出来的结果做为下一步处理的条件,若结果为空,没有对应处理
  • 异常未处理
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/04/15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Guava 的冲击
  • CompletableFuture的核心优势
  • 创建CompletableFuture对象
    • 静态方法
      • runAsync(Runnable runnable)
      • supplyAsync(Supplier supplier)
      • runAsync(Runnable runnable, Executor executor)
      • supplyAsync(Supplier supplier, Executor executor)
  • CompletionStage接口
    • 1 串行关系
      • thenApply系
      • thenAccept系
      • thenRun系
      • thenCompose系
    • 2 AND汇聚
      • 3 OR汇聚
      • 4 异常处理
      相关产品与服务
      云数据库 MySQL
      腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档