前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >提高效率,实现异步编程,我用CompletableFuture(下)

提高效率,实现异步编程,我用CompletableFuture(下)

原创
作者头像
小高先生
修改2024-03-01 06:39:14
2120
修改2024-03-01 06:39:14
举报
文章被收录于专栏:Java并发编程Java并发编程

提高效率,实现异步编程,我用CompletableFuture(下)

大家好,我是小高先生,书接上文,我们继续来学习CompletableFuture。上文我们讲了基础装Future是如何升级为神装CompletableFuture以及如何购买CompletableFuture,接下来我们一起来学习如何在战斗中使用CompletableFuture。

  • CompletableFuture的基本使用
  • CompletableFuture的实战案例
  • CompletableFuture常用方法
  • 结论

CompletableFuture的基本使用

先来看一下常规使用,可以完全替代Future。

代码语言:java
复制
public class CompletableFutureUserDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "----come in");
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("1s后出结果 " + result);
            return result;
        });

        System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
        System.out.println(completableFuture.get());
    }
}

我们还是要避免使用get(),毕竟搞不好还是会被阻塞的,这里用一下高级的方法whenComplete()。看下代码:

代码语言:java
复制
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + "----come in");
        int result = ThreadLocalRandom.current().nextInt(10);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("1s后出结果 " + result);
        return result;
    }).whenComplete((v,e) -> {
        //v表示上一阶段,e是异常
        if(e == null){
            //这里代表一切顺利
            System.out.println("-----计算完成,更新数值: " + v);
        }
	//如果出现异常就跳到这个阶段
    }).exceptionally(e -> {
        e.printStackTrace();
        System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
        return null;
    });

    System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
}

一下子就复杂起来了,不过还好,逻辑是很清楚的。我们还是像之前一样设定任务,然后再调用whenComplete()方法。在这个方法里面,我们可以判断在执行任务过程中是否有异常。但是,当我们运行代码之后,会发现有问题,并没有输出结果。难道是程序有问题吗?

实际上,这个问题的原因是CompletableFuture需要1秒钟来处理任务,但是main方法执行得太快了,还没等任务执行完成,main线程就已经结束了。大家都知道守护线程和用户线程吧,CompletableFuture使用了默认线程池ForkJoinPool,就像守护线程一样。如果main线程结束了,守护线程也会关闭,所以就不会输出了。

为了解决这个问题,我们只需要在最后加上一个小小的延时,让主线程等一下,等移步任务完成就可以看见输出了。

代码语言:java
复制
public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + "----come in");
        int result = ThreadLocalRandom.current().nextInt(10);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("1s后出结果 " + result);
        return result;
    }).whenComplete((v,e) -> {
        //v表示上一阶段,e是异常
        if(e == null){
            //这里代表一切顺利
            System.out.println("-----计算完成,更新数值: " + v);
        }
    }).exceptionally(e -> {
        e.printStackTrace();
        System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
        return null;
    });

    System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
    //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

我们可以通过使用线程池解决上述问题。

代码语言:java
复制
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        try {
            CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "----come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("1s后出结果 " + result);
                return result;
            },threadPool).whenComplete((v,e) -> {
                //v表示上一阶段,e是异常
                if(e == null){
                    //这里代表一切顺利
                    System.out.println("-----计算完成,更新数值: " + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
                return null;
            });
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }


        System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
        //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭
//        try {
//            TimeUnit.SECONDS.sleep(3);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
    }

在上述代码中,执行流程首先启动了一个异步任务,该任务将在独立的线程中运行。在这个异步任务中,我们模拟了一个耗时的操作,即让线程休眠1秒钟,并在休眠结束后打印出"任务完成"的消息。当这个耗时的异步任务执行完毕后,程序的控制权将转移到whenComplete()方法。whenComplete()方法接受一个BiConsumer函数式接口作为参数,这个接口有两个输入参数:一个是代表异步计算结果的result,另一个是代表可能发生的异常的exception。

whenComplete()方法体现了CompletableFuture任务的分阶段特性,这是因为CompletableFuture实现了CompletionStage接口。CompletionStage接口代表了异步计算过程中的一个阶段,它定义了一组方法来处理这个阶段的完成情况和结果。在我们的示例中,whenComplete()方法被用来处理异步任务完成后的情况。如果任务成功完成,没有发生异常,那么会打印出"任务正常完成"的消息;如果任务在执行过程中发生了异常,则会捕获这个异常并打印出相应的错误消息。

通过使用whenComplete()方法,我们可以清晰地表达出异步任务完成后要执行的逻辑。

CompleteFuture的实战案例

之前的内容了解过后,我们就学会了CompleteFuture的基本使用方法,接下来一起看看在电商网站比价案例中如何使用CompleteFuture解决问题。下面是案例有关的需求分析。

1.需求说明

2.输出返回

3.解决方案

希望输出结果是同款产品在不同电商平台的价格清单列表,返回一个List<String>。

如下为两种方案的代码,对比可知利用CompleteFuture可以大大提升效率。使用CompleteFuture的方案中我使用了join方法,它和get()方法区别就是使用get()必须抛出异常,而join()不需要,join()在编译期间不会检查异常,会更简洁一些。

代码语言:java
复制
public class CompletableFutureMallDemo {
    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("taobao"),
            new NetMall("pdd"));
    public static List<String> getPrice(List<NetMall> list,String productName){
        return list
                .stream()
                .map(netMall ->
                        String.format(productName + "in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }
    public static List<String> getPriceBympColetableFuture(List<NetMall> list,String productName){
        //这里就是把list的每一个元素都交给一个CompleteFuture
        //stream流的作用
        //List<NetMall> ----> List<CompletableFuture<String>> ----> List<String>
        //拆解Stream流
        //第一次Stream:list里的对象为NetMall,表示不同商家,映射为不同CompleteFuture,都是生成价格的任务,得到List<CompletableFuture>
        //第二次Stream:将List<CompletableFuture<String>>变为List<String>
        return list
                .stream()
                .map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + "in %s price is %.2f",
                        netMall.getNetMallName(),
                        netMall.calcPrice(productName)))).collect(Collectors.toList())
                .stream()
                .map(s -> s.join()).collect(Collectors.toList());
    }
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list,"mySql");
        for (String s : list1) {
            System.out.println(s);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("---costTime: " + (endTime - startTime) + " 毫秒");

        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByCompletableFuture(list,"mySql");
        for (String s : list2) {
            System.out.println(s);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("---costTime: " + (endTime2 - startTime2) + " 毫秒");
    }
}



class NetMall{
    @Getter
    private String netMallName;
    public NetMall(String netMallName){
        this.netMallName = netMallName;
    }

    public double calcPrice(String productName){
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

CompleteFuture常用方法

在上一节中,我们主要探讨了如何在项目中应用CompletableFuture,接下来我们将深入探讨CompletableFuture的其他常用方法。

CompletableFuture不仅实现了Future接口,还实现了CompletionStage接口。虽然Future接口只包含五个方法,限制了其功能范围,但CompletionStage接口则提供了一套更为丰富的方法集合,极大地扩展了异步编程的能力。通过实现这两个接口,CompletableFuture完美地融合了Future的基本功能和CompletionStage的高级特性,使其成为一个功能强大且灵活的异步编程工具。

1.获取结果和触发计算

让我们重点关注getNow()和complete()这两个方法,因为在之前的案例中,我们已经体验了CompletableFuture的其他几种方法。

getNow(T valueIfAbsent)方法的主要作用是提供一种非阻塞的方式来获取CompletableFuture的结果。如果CompletableFuture的计算已经完成,那么getNow()将返回实际的计算结果;如果计算尚未完成,那么它会立即返回一个默认值,即传递给getNow()方法的参数valueIfAbsent。这种方式确保了无论计算是否完成,调用者都能立即获得一个值,而无需等待。

complete()方法则用于显式地完成CompletableFuture。如果CompletableFuture尚未完成,调用complete()会终止任何正在进行的计算(如果有的话),并返回true。随后,当调用get()或其他相关的获取结果的方法时,将会返回传递给complete()的参数。然而,如果CompletableFuture已经完成,那么complete()不会干预任何事情,而是返回false,表示没有进行任何操作。

代码语言:java
复制
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "abc";
        });

        //System.out.println(completableFuture.get());
        //System.out.println(completableFuture.get(2,TimeUnit.SECONDS));
        //System.out.println(completableFuture.join());
        //如果计算完成,返回结果值。否则就返回getNow()中传递的参数
        System.out.println(completableFuture.getNow("xxx"));

    }
}
代码语言:java
复制
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "abc";
        });

        TimeUnit.SECONDS.sleep(1);
        System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.join());
    }
}

2.对计算结果进行处理

这两个方法,thenApply()和handle(),都是在处理计算结果并且存在依赖关系时使用的,它们以串行化的方式逐步完成异步任务,传入的参数都是上一步的计算结果。这种串行化的处理方式就像我们之前讨论的烤肉过程一样,需要一步一步地按顺序进行。然而,它们在处理异常方面有所不同。thenApply()方法在遇到异常时会停止后续步骤的执行,因为如果当前步骤出现错误,它不会继续往下走。这是一种保守的策略,确保了只有在没有错误的情况下才会进行下一步的处理。相比之下,handle()方法在遇到异常时的行为不同。即使在遇到异常的情况下,它也会往下执行。这是因为handle()方法提供了一个可以处理异常参数的机制,允许我们在发生异常时进一步处理。这种策略提供了更多的灵活性,使得我们能够在异常情况下采取适当的措施,而不是简单地停止整个任务链。

通过使用thenApply()和handle()方法,我们可以更好地控制异步任务的处理流程,根据不同的需求选择适合的方法来应对可能出现的异常情况。

代码语言:java
复制
public class CompletableFutureAPI2Demo {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1;
        }).thenApply(f -> {
            System.out.println("222");
            return f + 2;
        }).thenApply(f -> {
            System.out.println("333");
            return f + 3;
        }).whenComplete((v,e) -> {
            if(e == null){
                System.out.println("计算结果:" + v);
            };
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });


        System.out.println(Thread.currentThread().getName() + "主线程忙其他的");

    }
}

看一下有异常的情况,就会终止在某一步骤。

代码语言:java
复制
public class CompletableFutureAPI2Demo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1;
        },executorService).thenApply(f -> {
            System.out.println("222");
            int b = 10 / 0;
            return f + 2;
        }).thenApply(f -> {
            System.out.println("333");
            return f + 3;
        }).whenComplete((v,e) -> {
            if(e == null){
                System.out.println("计算结果:" + v);
            };
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });
        
        System.out.println(Thread.currentThread().getName() + "主线程忙其他的");
    }
}

再看一下handle()怎么用,它和thenApply()的区别就是多了异常处理,传入参数就需要多加一个exception。

代码语言:java
复制
public class CompletableFutureAPI3Demo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1;
        },executorService).handle((f,e) -> {
            System.out.println("222");
            //int b = 10 / 0;
            return f + 2;
        }).handle((f,e) -> {
            System.out.println("333");
            return f + 3;
        }).whenComplete((v,e) -> {
            if(e == null){
                System.out.println("计算结果:" + v);
            };
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });

        System.out.println(Thread.currentThread().getName() + "主线程忙其他的");
		executorService.shutdown();
    }
}

如果我们添加异常,异常会抛出,但是后面的handle步骤会照常做。

3.对计算结果进行消费

thenAccept()方法作用是接受任务的处理结果,并消费处理,无返回结果。通过代码不难看出,这个方法就是获取前三步处理后的结果,然后输出并且无返回值。

代码语言:java
复制
public class CompletableFutureAPI4Demo {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }).thenApply(f -> {
            return f + 2;
        }).thenApply(f -> {
            return f + 3;
        }).thenAccept(r -> {
            System.out.println(r);
        });
    }
}

thenAccept() 和 thenApply() 都是 Java 中 CompletableFuture 类的方法,它们用于处理异步计算的结果。不过,它们的用途和行为有所不同:

thenAccept(Consumer<? super T> action): 这个方法接受一个 Consumer 函数式接口作为参数,该接口表示一个接受类型为 T 的输入并执行某种操作的消费者。thenAccept() 方法没有返回值(即返回 void),因此它主要用于执行某些基于异步计算结果的操作,而不关心返回结果。在 thenAccept() 方法内部,通常不会有 return 语句,因为它的目的是消费结果而不是产生新的值。当你使用 thenAccept() 时,你不能直接通过 join() 获取返回值,因为 join() 返回的是计算的结果,而不是 thenAccept() 中的操作结果。如果 thenAccept() 中的操作有返回值,那么这个返回值会被忽略。

thenApply(Function<? super T,? extends U> fn): 与 thenAccept() 不同,thenApply() 方法接受一个 Function 函数式接口作为参数,该接口表示一个接受类型为 T 的输入并返回类型为 U 的结果的函数。thenApply() 方法会返回一个新的 CompletableFuture,其结果是将函数 fn 应用于原始 CompletableFuture 的结果。这意味着 thenApply() 不仅可以消费异步计算的结果,还可以产生一个新的结果。你可以通过 join() 方法获取这个新的结果。

总结一下,thenAccept() 主要用于消费异步计算的结果而不返回任何值,而 thenApply() 则用于对异步计算的结果进行转换并返回一个新的结果。

代码语言:java
复制
public class CompletableFutureAPI4Demo {
    public static void main(String[] args) {
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
        System.out.println(CompletableFuture.supplyAsync(() -> "resultB").thenAccept(r -> System.out.println(r)).join());
    }
}

4.对计算速度的选用

completableFuture 类中的一个显著特性是其能够利用 applyToEither 方法高效地处理多个异步任务。该方法的智能之处在于,它不需要等待所有任务完成,而是会选择最先完成的那个任务的结果来进行后续的操作。这种机制允许程序在最短的时间内响应完成的任务,从而提高整体的执行效率。简而言之,applyToEither 方法体现了 CompletableFuture 对异步编程的优化,通过动态选择最快完成的任务结果,避免了不必要的等待,进而加速了程序的执行流程。

代码语言:java
复制
public class CompletableFutureFastDemo {
    public static void main(String[] args) {
        CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            System.out.println("A come in");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playA";
        });

        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            System.out.println("B come in");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playB";
        });

        CompletableFuture<String> res = playA.applyToEither(playB, f -> {
            return f + " is winer";
        });

        System.out.println(Thread.currentThread().getName() + "\t" + "----: " + res.join());
    }
}

5.对计算结果进行合并

当两个CompletionStage任务都完成时,我们可以使用thenCombine方法来处理这两个任务的结果。这个方法确保了所有分支任务完成后才会进行下一步操作

具体来说,thenCombine会等待所有的CompletionStage任务都完成。在这个过程中,无论哪个任务先完成,它都会等待其他分支任务的完成。只有当所有的任务都完成后,thenCombine才会接收到所有任务的结果,并将它们一起传递给提供的函数进行处理。

这种设计使得我们能够轻松地组合多个异步计算的结果,而不需要关心任务的完成顺序。

代码语言:java
复制
public class CompletableFutureCombineDemo {
    public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---启动");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---启动");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 20;
        });

        CompletableFuture<Integer> res = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println("----开始两个结果合并");
            return x + y;
        });
        System.out.println(res.join());
    }
}

结论

本文继续深入探讨了CompletableFuture的用法,涵盖了基本和常用方法的使用,并通过电商案例来演示在真实场景中如何有效地运用CompletableFuture。

  • 基本方法 whenComplete()方法是CompletableFuture中的一个非阻塞性方法,它不会像get()方法那样导致线程阻塞,从而提供了更好的性能表现。在使用CompletableFuture时,也需要注意线程池的管理。我们可以通过传入自定义线程池来避免主线程过早结束而导致CompletableFuture的任务被中断。
  • 常用方法 通过实现CompletionStage接口,CompletableFuture扩展了自己的功能,增强了对计算结果的处理能力。这使得CompletableFuture不仅能够处理异步任务的结果,还能够以声明式的方式组合多个异步计算。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 提高效率,实现异步编程,我用CompletableFuture(下)
    • CompletableFuture的基本使用
      • CompleteFuture的实战案例
        • 1.需求说明
        • 2.输出返回
        • 3.解决方案
      • CompleteFuture常用方法
        • 1.获取结果和触发计算
        • 2.对计算结果进行处理
        • 3.对计算结果进行消费
        • 4.对计算速度的选用
        • 5.对计算结果进行合并
      • 结论
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档