前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >CompletableFuture 应用实践

CompletableFuture 应用实践

作者头像
luoxn28
发布2020-12-02 10:24:03
6180
发布2020-12-02 10:24:03
举报
文章被收录于专栏:TopCoderTopCoder
Java8之前,如果要对计算结果进行异步化处理不是很方便,需要借助于Future,并且需要当前线程主动执行Future.get获取运行结果。因此Java8新增了CompletableFuture提供对异步计算的支持,可以通过回调的方式处理计算结果,注意此时执行回调处理的是执行任务的线程。

CompletableFuture 类实现了CompletionStage和Future接口,所以还可以像之前使用Future那样使用CompletableFuture ,尽管已不再推荐这样用了。

首先来看一个问题,如果要执行多个任务,每个任务会返回对应结果,现在需要所有任务执行完毕之后,将这些任务结果统一打印出来,该如何完成呢?注意尽量不要使用业务线程来等待多个任务的结果,也就是不要使用Future.get方式。

那么就可以借助CompletableFuture 类的来实现,每个任务执行完毕后,判断下是否所有的任务执行完毕,如果执行完毕,那么就打印所有的任务返回结果。代码实现如下(3个任务的场景):

代码语言:javascript
复制
public static void main(String[] args) {
    int cnt = 3;
    AtomicInteger taskCnt = new AtomicInteger(cnt);
    DemoTask<String> demoTask = new DemoTask<>(taskCnt, new String[taskCnt.get()]);

    for (int i = 0; i < cnt; i++) {
        CompletableFuture.supplyAsync(() -> {
            String result = format("[supplyAsync] result");

            // random n millisecond
            int ms = new Random().nextInt(100);
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(ms));

            System.out.println(result + String.format("[sleep %s ms]", ms));
            return result;
        }).whenComplete((s, throwable) -> demoTask.afterHandle(s));
    }


    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}

@AllArgsConstructor
static class DemoTask<T> {
    // 注意: taskCnt == resultList.size()
    private AtomicInteger taskCnt;
    private T[] resultList;

    public void afterHandle(T result) {
        int num = taskCnt.decrementAndGet();
        resultList[num] = result;

        if (num == 0) {
            // all done
            System.out.println("all done:");
            for (T t : resultList) {
                System.out.println(" " + t);
            }
        }
    }
}

private static String format(String msg) {
    return String.format("[%s] %s", Thread.currentThread().getName(), msg);
}

输出结果如下:

接下来就一起看下CompletableFuture 类的使用吧~

CompletableFuture的创建

CompletableFuture 类的创建可以自己设置future的result,示例如下:

代码语言:javascript
复制
// 创建一个带result的CompletableFuture
CompletableFuture<String> future = CompletableFuture.completedFuture("result");
future.get();

// 默认创建的CompletableFuture是没有result的,这时调用future.get()会一直阻塞下去直到有result或者出现异常
future = new CompletableFuture<>();
try {
    future.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
    // no care
}

// 给future填充一个result
future.complete("result");
assert "result".equals(future.get());

// 给future填充一个异常
future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("exception"));
try {
    future.get();
} catch (Exception e) {
    assert "exception".equals(e.getCause().getMessage());
}

上面的示例是自己设置future的result,一般情况下我们都是让其他线程或者线程池来执行future这些异步任务。除了直接创建CompletableFuture 对象外(不推荐这样使用),还可以使用如下4个方法创建CompletableFuture 对象:

代码语言:javascript
复制
// runAsync是Runnable任务,不带返回值的,如果入参有executor,则使用executor来执行异步任务
public static CompletableFuture<Void>  runAsync(Runnable runnable)
public static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
// supplyAsync是待返回结果的异步任务
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

如果入参不带executor,则默认使用ForkJoinPool.commonPool()作为它的线程池执行异步任务,使用示例如下:

代码语言:javascript
复制
CompletableFuture.runAsync(() -> {
    System.out.println("hello world");
}, executor);
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
});

CompletableFuture完成动作

代码语言:javascript
复制
public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

action是Action类型,从上面可以看出它既可以处理正常返回值也可以处理异常,whenComplete会在任务执行完成后直接在当前线程内执行action动作,后缀带Async的方法是交给其他线程执行action(如果是线程池,执行action的可能和之前执行异步任务的是同一个线程),入参带executor的交给executor线程池来执行action动作,当发生异常时,会在当前线程内执行exceptionally方法。

CompletableFuture完成动作示例代码:

代码语言:javascript
复制
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).whenCompleteAsync((result, e) -> {
    System.out.println(result + " " + e);
}).exceptionally((e) -> {
    System.out.println("exception " + e);
    return "exception";
});

除了用上面的whenComplete来执行完成动作之外,还可以使用handle方法,该方法可以将上一个CompletableFuture的返回类型转换:

代码语言:javascript
复制
public <U> CompletableFuture<U>  handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

// handle方法示例:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
});
CompletableFuture<Integer> f2 = f1.handle((r, e) -> {
    System.out.println("handle");
    return 1;
});

除了使用handle方法来执行CompletableFuture返回类型转换之外,还可以使用thenApply方法,二者不同的是前者会处理正常返回值和异常,因此可以屏蔽异常,避免继续抛出;而后者只能处理正常返回值,一旦有异常就会抛出。

代码语言:javascript
复制
public <U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

// thenApply方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenApply((r) -> {
    System.out.println(r);
    return "aaa";
}).thenApply((r) -> {
    System.out.println(r);
    return 1;
});

上面的handle、thenApply都是返回新的CompletableFuture类型,如果是指为了执行某些消费动作而不返回新的CompletableFuture类型,则可以使用thenAccept方法。

代码语言:javascript
复制
public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

// thenAccept方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenAccept(r -> {
    System.out.println(r);
}).thenAccept(r -> {
    // 这里的r为Void(null)了
    System.out.println(r);
});

上面的handle、thenApply和thenAppept都是对上一个CompletableFuture返回的结果进行某些操作,那么可不可以对直接对上一个CompletableFuture进行操作呢?其实也是可以的,使用thenAppeptBoth方法即可。注意,thenAppeptBothhandle/thenApply/thenAppep的流程是一样的,只不过thenAppeptBoth的入参可以是另一个CompletableFuture对象。

代码语言:javascript
复制
public <U> CompletableFuture<Void>   thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void>  runAfterBoth(CompletionStage<?> other,  Runnable action)

// thenAcceptBoth方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenAcceptBoth(CompletableFuture.completedFuture("result2"), (r1, r2) -> {
    System.out.println(r1 + "-" + r2);
});

注意,thenAcceptBoth方法是没有返回值的(CompletableFuture<Void>),如果想用thenAcceptBoth这样的功能并且还带有返回值,那么可以可使用thenCombine方法。

代码语言:javascript
复制
public <U,V> CompletableFuture<V>    thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>    thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>    thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

如果当任务完成时并不想用CompletableFuture的结果,可以使用thenRun方法来执行一个Runnable。

代码语言:javascript
复制
public CompletableFuture<Void>  thenRun(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action, Executor executor)

以上方法都是在方法中返回一个值(或者不返回值),其实还可以返回一个CompletableFuture,是不是很像类的组合一样,没错,就是这样的。

辅助方法 allOf 和 anyOf

allOf方法是当所有的CompletableFuture都执行完后执行计算。

anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果相同。

代码语言:javascript
复制
public static CompletableFuture<Void>      allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object>    anyOf(CompletableFuture<?>... cfs)

推荐阅读

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TopCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CompletableFuture的创建
  • CompletableFuture完成动作
  • 辅助方法 allOf 和 anyOf
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档