使用CompletableFuture构建异步应用(二)

本文主要介绍Java 8 中的异步处理的方式,主要是 CompletableFuture类的一些特性。 为了展示CompletableFuture的强大特性,我们会创建一个名为“最佳价格查询器” (best-price-finder)的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。这个过程中,你会学到几个重要的技能。

  • 首先,你会学到如何为你的客户提供异步API。(如果你拥有一间在线商店的话,这是非常有帮助的)。
  • 其次,你会掌握如何让你使用了同步API的代码变为非阻塞代码。你会了解如何使用流水线将两个接续的异步操作合并为一个异步计算操作。这种情况肯定会出现,比如,在线 商店返回了你想要购买商品的原始价格,并附带着一个折扣代码——最终,要计算出该 商品的实际价格,你不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率。
  • 你还会学到如何以响应式的方式处理异步操作的完成事件,以及随着各个商店返回它的 商品价格,最佳价格查询器如何持续地更新每种商品的最佳推荐,而不是等待所有的商店都返回他们各自的价格(这种方式存在着一定的风险,一旦某家商店的服务中断,用 户可能遭遇白屏)。

获取商品价格的同步方法

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    /**
     * 获取产品价格的同步方法
     * @param product 产品名称
     * @return 产品价格
     */
    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        //一个模拟的延迟方法
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public static void delay() {
        int delay = 1000;
        //int delay = 500 + RANDOM.nextInt(2000);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public String getName() {
        return name;
    }
}

很明显,这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被阻塞。为等待同步事件完成而等待1秒钟,这是无法接受的,尤其是考虑到最佳价格查询器对 网络中的所有商店都要重复这种操作。在本文的下个小节中,你会了解如何以异步方式使用同 步API解决这个问题。

将同步方法转换为异步方法

我们使用新的CompletableFuture类来将getPrice方法转换为异步的getPriceAsync方法。

    /**
     * 异步的获取产品价格
     *
     * @param product 产品名
     * @return 最终价格
     */
    public Future<Double> getPriceAsync(String product) {
        //创建CompletableFuture 对象,它会包含计算的结果
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        //在另一个线程中以异步方式执行计算
        new Thread(() -> {
            double price = calculatePrice(product);
            //需长时间计算的任务结 束并得出结果时,设置 Future的返回值
            futurePrice.complete(price);
        }).start();
        // 无需等待还没结束的计算,直接返回Future对象
        return futurePrice;
    }

在这段代码中,你创建了一个代表异步计算的CompletableFuture对象实例,它在计算完 成时会包含计算的结果。 使用这个API的客户端,可以通过下面的这段 代码对其进行调用。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ShopMain {

  public static void main(String[] args) {
    Shop shop = new Shop("BestShop");
    long start = System.nanoTime();
    //查询商店,试图 取得商品的价格
    Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
    long invocationTime = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("Invocation returned after " + invocationTime 
                                                    + " msecs");
    // 执行更多任务,比如查询其他商店
    doSomethingElse();
    // 在计算商品价格的同时
    try {
        //从Future对象中读 取价格,如果价格 未知,会发生阻塞
        double price = futurePrice.get();
        System.out.printf("Price is %.2f%n", price);
    } catch (ExecutionException | InterruptedException e) {
        throw new RuntimeException(e);
    }
    long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("Price returned after " + retrievalTime + " msecs");
  }

  private static void doSomethingElse() {
      System.out.println("Doing something else...");
  }
}

Output: Invocation returned after 43 msecs Price is 123.26 Price returned after 1045 msecs

你会发现getPriceAsync方法的调用返回远远早于最终价格计算完成的时间。接下来我们看看如何正确地管理 异步任务执行过程中可能出现的错误。

错误处理

如果没有意外,我们目前开发的代码工作得很正常。但是,如果价格计算过程中产生了错误 会怎样呢?非常不幸,这种情况下你会得到一个相当糟糕的结果:用于提示错误的异常会被限制 在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结 果的客户端永久地被阻塞。 解决这种问题的方法有两种:

  1. 客户端可以使用重载版本的get方法,它使用一个超时参数来避免发生这样的情况。
  2. 通过异步处理中发生的异常,根据不同的异常类型来进行不同的处理。

为了让客户端能了解商店无法提供请求商品价格的原因,你需要使用 CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问 题的异常抛出。代码如下所示:

    /**
     * 抛出CompletableFuture内的异常版本的getPriceAsyncForException方法
     *
     * @param product 产品名
     * @return 最终价格
     */
    public Future<Double> getPriceAsyncForException(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            try {
                double price = calculatePrice(product);
                //如果价格计算正常结束,完成Future操作并设置商品价格
                futurePrice.complete(price);
            } catch (Exception ex) {
                //否则就抛出导致失败的异常,完成这 次Future操作
                futurePrice.completeExceptionally(ex);
            }

        }).start();
        return futurePrice;
    }

如果该方法抛出了一个运 行时异常“product not available”,客户端就会得到像下面这样一段ExecutionException:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: product not available
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
 at lambdasinaction.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
        ... 5 more
    Caused by: java.lang.RuntimeException: product not available
at lambdasinaction.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
at lambdasinaction.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23) at lambdasinaction.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source) at java.lang.Thread.run(Thread.java:744)

目前为止我们已经了解了如何通过编程创建CompletableFuture对象以及如何获取返回值了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏崔庆才的专栏

看完这篇文章还会不懂Python装饰器?掐死小编吧

28660
来自专栏个人分享

Flink单机版安装与wordCount

Flink为大数据处理工具,类似hadoop,spark.但它能够在大规模分布式系统中快速处理,与spark相似也是基于内存运算,并以低延迟性和高容错性主城,其...

18010
来自专栏牛客网

面经总结

面试记录 头条 - 一面 - 自我介绍 - 连续子数组的最大和 - 二叉树任意两个节点之间路径的最大长度 - 二叉树的深度 - 一面上个周只记得这么多了 - 二...

41370
来自专栏有趣的Python和你

千里之行,始于足下变量字符串

11430
来自专栏tkokof 的技术,小趣及杂念

HGE系列之一 初窥门道

对于游戏开发有些兴趣的朋友,尤其是那些至今都扔不下2D、如我这般的志士同仁,想必对于HGE都有所耳闻,但如果要论及深入了解与运用,那恐怕就寥寥无几人了,而对于...

12020
来自专栏Android开发经验

ExpandableStickyListHeadersListView遇到的一个问题

15640
来自专栏牛客网

51信用卡前端凉面

18600
来自专栏安恒网络空间安全讲武堂

护网杯REFINAL——write up

根据前面的一些信息,判断出长度为0x30,经过如下设置,我们可以很快开始进行动态调试。

18920
来自专栏racaljk

关于llvm kaleidoscope: 记一次Debug血泪之路

简而言之,慎(bu)用(yong)全局变量!                                

15510
来自专栏华章科技

入门科普:什么时候要用Python?用哪个版本?什么时候不能用?

Python使用面向对象编程(object-oriented programming,OOP)和构造,你可以像任何其它面向对象的语言一样使用它,譬如Java。

20420

扫码关注云+社区

领取腾讯云代金券