前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >CompletableFuture异步编排

CompletableFuture异步编排

作者头像
别团等shy哥发育
发布2023-04-23 11:57:21
7120
发布2023-04-23 11:57:21
举报

CompletableFuture异步编排

1、CompletableFuture异步编排

1.1 为什么需要异步编排

  问题:查询商品详情页的逻辑非常复杂,数据的获取都需要远程调用,必然需要花费更多的时间。

目前我业务中商品详情页包含如下7个方法:

获取sku的基本详情和图片列表 获取实时价格 获取三级分类 获取销售属性和选中状态 获取商品切换数据 获取海报信息 获取平台信息

  上面查询过程都是用OpenFeign服务调用实现的,假设每个远程调用需要1s时间,那么全部执行完需要7s,这对用户来说是难以接受的。

  那如果有多个线程同时执行这7步操作呢,时间是不是就更短了。


1.2 CompletableFuture介绍

FutureJava 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

  在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

CompletableFutureFutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

image-20230419214958726
image-20230419214958726

1.3 创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

image-20230419215150849
image-20230419215150849

  没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

  • runAsync方法不支持返回值。
  • supplyAsync可以支持返回值。

whenComplete可以处理正常或异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务


whenCompletewhenCompleteAsync 的区别:

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。

whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

  方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)


代码演示:

代码语言:javascript
复制
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建一个没有返回值的异步对象
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("没有返回值结果");
        });
        System.out.println(future.get());

        //创建一个有返回值的异步对象
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                int a=1/0;
                return 404;
            }
        }).whenComplete(new BiConsumer<Integer, Throwable>() {
            /**
             *whenComplete 和异步对象使用用一个线程
             * @param integer   异步对象执行后的返回值结果
             * @param throwable 异常对象
             */
            @Override
            public void accept(Integer integer, Throwable throwable) {
                System.out.println("whenComplete:"+integer);
                System.out.println("whenComplete:"+throwable);
            }
        }).exceptionally(new Function<Throwable, Integer>() {
            /**
             * 只处理异常的回调
             * @param throwable
             * @return
             */
            @Override
            public Integer apply(Throwable throwable) {
                return null;
            }
        }).whenCompleteAsync(new BiConsumer<Integer, Throwable>() {
            /**
             * whenCompleteAsync跟异步对象有可能不适用同一个线程,由线程池重新分配
             * @param integer
             * @param throwable
             */
            @Override
            public void accept(Integer integer, Throwable throwable) {

            }
        });
    }

}
image-20230419220803592
image-20230419220803592

1.4 线程串行化与并行化方法

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

image-20230419225245838
image-20230419225245838

thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

在这里插入图片描述
在这里插入图片描述

thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

在这里插入图片描述
在这里插入图片描述

  带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

代码语言:javascript
复制
Function<? super T,? extends U> 
T:上一个任务返回结果的类型 
U:当前任务的返回值类型

  代码演示:

代码语言:javascript
复制
public class CompletableFutureDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(
                        50,
                        500,
                        30,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(10000)
                );

        //创建一个异步任务对象A
        CompletableFuture<Object> futureA = CompletableFuture.supplyAsync(new Supplier<Object>() {
            @Override
            public Object get() {
                return "404";
            }
        },threadPoolExecutor);
        //创建一个B
        futureA.thenAcceptAsync(new Consumer<Object>() {
            @SneakyThrows
            @Override
            public void accept(Object o) {
                    Thread.sleep(500);
                    System.out.println("我是B");
            }
        },threadPoolExecutor);
        //创建一个C
        futureA.thenAcceptAsync(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                System.out.println("我是C");
            }
        },threadPoolExecutor);
    }
}
image-20230419221321514
image-20230419221321514

  这里是测试看是否是并行化,我们让B休眠一会,可以看到先输出C再输出B,说明是并行化。   因为如果是串行化的化,那么即使B休眠一会,那么C也会一直等着,输出顺序为B、C

1.5 多任务组合

代码语言:javascript
复制
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

allOf:等待所有任务完成。

anyOf:只要有一个任务完成。

1.6 优化商品详情页(业务代码)

1.6.1 未优化之前的代码

代码语言:javascript
复制
@Service
@SuppressWarnings("all")
public class ItemServiceImpl implements ItemService {
    
    @Autowired
    private ProductFeignClient productFeignClient;

    //获取商品详情数据
    @Override
    public HashMap<String, Object> getItem(Long skuId) {
        HashMap<String, Object> resultMap=new HashMap<>();

        //获取sku的基本详情和图片列表
        SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
        //获取实时价格
        BigDecimal skuPrice = productFeignClient.getSkuPrice(skuId);

        //判断
        if(skuInfo!=null){
            //获取三级分类
            BaseCategoryView categoryView = productFeignClient.getCategoryView(skuInfo.getCategory3Id());
            //获取销售属性和选中状态
            List<SpuSaleAttr> spuSaleAttrListCheckBySku = productFeignClient.getSpuSaleAttrListCheckBySku(skuId, skuInfo.getSpuId());
            //获取商品切换数据
            Map skuValueIdsMap = productFeignClient.getSkuValueIdsMap(skuInfo.getSpuId());
            //获取海报信息
            List<SpuPoster> spuPosterBySpuId = productFeignClient.findSpuPosterBySpuId(skuInfo.getSpuId());

            resultMap.put("categoryView",categoryView);
            resultMap.put("spuSaleAttrList",spuSaleAttrListCheckBySku);
            resultMap.put("valuesSkuJson", JSON.toJSONString(skuValueIdsMap));
            resultMap.put("spuPosterList",spuPosterBySpuId);
        }
        //获取平台信息
        List<BaseAttrInfo> attrList = productFeignClient.getAttrList(skuId);
        //处理数据符合要求 List  Obj  key attrName value attrValue
        List<Map<String, String>> spuAttrList = attrList.stream().map(baseAttrInfo -> {
            Map<String, String> map = new HashMap<>();
            map.put("attrName", baseAttrInfo.getAttrName());
            map.put("attrValue", baseAttrInfo.getAttrValueList().get(0).getValueName());
            return map;
        }).collect(Collectors.toList());

        //存储数据
        resultMap.put("skuInfo",skuInfo);
        resultMap.put("price",skuPrice);
        resultMap.put("skuAttrList",spuAttrList);
        return resultMap;
    }
}

1.6.2 使用CompletableFuture异步编排

配置线程池:

代码语言:javascript
复制
@Configuration
public class ThreadPoolConfig {
    /**
     * 核心线程数
     * 最大线程数
     * 空闲存活时间
     * 时间单位
     * 阻塞队列
     * 默认:
     *  线程工厂
     *  拒绝策略
     * @return
     */
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){

        return new ThreadPoolExecutor(
                50,
                500,
                30,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10000)
        );
    }
}

实现类改造:

代码语言:javascript
复制
@Service
@SuppressWarnings("all")
public class ItemServiceImpl implements ItemService {

    @Autowired
    private ProductFeignClient productFeignClient;

    @Autowired
    private ThreadPoolExecutor executor;

    //获取商品详情数据
    @Override
    public HashMap<String, Object> getItem(Long skuId) {
        HashMap<String, Object> resultMap=new HashMap<>();

        CompletableFuture<SkuInfo> skuInfoCompletableFuture = CompletableFuture.supplyAsync(new Supplier<SkuInfo>() {
            @Override
            public SkuInfo get() {
                //获取sku的基本详情和图片列表
                SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
                resultMap.put("skuInfo", skuInfo);
                return skuInfo;
            }
        }, executor);

        CompletableFuture<Void> skuPriceCompletableFuture = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                //获取实时价格
                BigDecimal skuPrice = productFeignClient.getSkuPrice(skuId);
                resultMap.put("price", skuPrice);
            }
        }, executor);

        //判断
        CompletableFuture<Void> categoryViewCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取三级分类
                BaseCategoryView categoryView = productFeignClient.getCategoryView(skuInfo.getCategory3Id());
                resultMap.put("categoryView",categoryView);
            }
        }, executor);

        CompletableFuture<Void> spuSaleAttrListCheckBySkuCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取销售属性和选中状态
                List<SpuSaleAttr> spuSaleAttrListCheckBySku = productFeignClient.getSpuSaleAttrListCheckBySku(skuId, skuInfo.getSpuId());
                resultMap.put("spuSaleAttrList",spuSaleAttrListCheckBySku);
            }
        }, executor);
        CompletableFuture<Void> skuValueIdsMapCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取商品切换数据
                Map skuValueIdsMap = productFeignClient.getSkuValueIdsMap(skuInfo.getSpuId());
                resultMap.put("valuesSkuJson", JSON.toJSONString(skuValueIdsMap));
            }
        }, executor);

        CompletableFuture<Void> findSpuPosterBySpuIdCompletableFuture = skuInfoCompletableFuture.thenAcceptAsync(new Consumer<SkuInfo>() {
            @Override
            public void accept(SkuInfo skuInfo) {
                //获取海报信息
                List<SpuPoster> spuPosterBySpuId = productFeignClient.findSpuPosterBySpuId(skuInfo.getSpuId());
                resultMap.put("spuPosterList",spuPosterBySpuId);
            }
        }, executor);


        CompletableFuture<Void> attrListCompletableFuture = CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                //获取平台信息
                List<BaseAttrInfo> attrList = productFeignClient.getAttrList(skuId);
                //处理数据符合要求 List  Obj  key attrName value attrValue
                List<Map<String, String>> spuAttrList = attrList.stream().map(baseAttrInfo -> {
                    Map<String, String> map = new HashMap<>();
                    map.put("attrName", baseAttrInfo.getAttrName());
                    map.put("attrValue", baseAttrInfo.getAttrValueList().get(0).getValueName());
                    return map;
                }).collect(Collectors.toList());

                //存储数据
                resultMap.put("skuAttrList", spuAttrList);
            }
        }, executor);

        //多任务组合 -- 所有的异步任务执行完成才是完成
        CompletableFuture.allOf(
                skuInfoCompletableFuture,
                skuPriceCompletableFuture,
                categoryViewCompletableFuture,
                spuSaleAttrListCheckBySkuCompletableFuture,
                skuValueIdsMapCompletableFuture,
                findSpuPosterBySpuIdCompletableFuture,
                attrListCompletableFuture
        ).join();
        return resultMap;
    }
}

  根据是否有返回值决定调用哪个API,然后看有没有依赖关系,有好几个都依赖SkuInfo,所以要用skuInfoCompletableFuture去创建。    我们需要等待每个任务执行完毕之后在返回,所以最后使用allOf方法进行多任务组合。

1.6.3 测试功能是否正常

  这种异步效果其实在高并发下环境下测比较好,我们这里验证功能是否正常就行。

  访问商品详情页:

image-20230419224246988
image-20230419224246988

  查看Redis中的数据

image-20230419224307865
image-20230419224307865

  可以看到,有6个key被缓存,由于我们的价格是实时价格,所以一直查的是数据库,千万别用缓存。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-04-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CompletableFuture异步编排
  • 1、CompletableFuture异步编排
    • 1.1 为什么需要异步编排
      • 1.2 CompletableFuture介绍
        • 1.3 创建异步对象
          • 1.4 线程串行化与并行化方法
            • 1.5 多任务组合
              • 1.6 优化商品详情页(业务代码)
                • 1.6.1 未优化之前的代码
                • 1.6.2 使用CompletableFuture异步编排
                • 1.6.3 测试功能是否正常
            相关产品与服务
            云数据库 Redis
            腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档