应对全场景AI框架部署挑战,MindSpore“四招”让你躺平!>>>
Java8新的异步编程方式 CompletableFuture
缘起: 一、Future java5引入了Future模式。Future接口是Java多线程Future模式的实现,在java.util.concurrent包中,可以来进行异步计算。 Future模式是多线程设计常用的一种设计模式。Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。 Future的接口的五个方法。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future接口的方法介绍如下:
boolean cancel (boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束 boolean isCancelled () 任务是否已经取消,任务正常完成前将其取消,则返回 true boolean isDone () 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true V get () throws InterruptedException, ExecutionException 等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计 算超时,将抛出TimeoutException Future的局限性 Future虽然可以实现获取异步执行结果的需求,但是它没有提供通知的机制,我们无法得知Future什么时候完成。 要么使用阻塞,在future.get()的地方等待future返回的结果,这时又变成同步操作。要么使用isDone()轮询地判断Future是否完成,这样会耗费CPU的资源。 二、CompletableFuture介绍 Java 8新增的CompletableFuture类正是吸收了所有Google Guava中ListenableFuture和SettableFuture的特征,还提供了其它强大的功能,让Java拥有了完整的非阻塞编程模型:Future、Promise 和 Callback(在Java8之前,只有无Callback 的Future)。 CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。
CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。 CompletableFuture中的静态方法 方法名 描述 runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。异步操作无返回值 runAsync(Runnable runnable, Executor executor) 使用指定的thread pool执行异步代码。使用指定线程池 supplyAsync(Supplier<U> supplier) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码,异步操作有返回值 supplyAsync(Supplier<U> supplier, Executor executor) 使用指定的thread pool执行异步代码,异步操作有返回值 获取结果 public T get() public T get(long timeout, TimeUnit unit) //设置超时时间 public T getNow(T valueIfAbsent) //如果结果当前已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值,不会阻塞 public T join() //join()返回计算的结果或者抛出一个unchecked异常(CompletionException) 对多个异步任务进行流水线操作 当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的操作 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) //和之前的CompleteableFuture使用相同的线程
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) //新选线程
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) //指定线程池
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn) //exceptionally方法返回一个新的CompletableFuture,当原始的 CompletableFuture抛出异常的时候,就会触发这个CompletableFuture的计算
三、实例
1、简单异步,获取返回值
/**
* @Author: Liu Yue
* @Descripition:
* @Date; Create in 2021/6/10 11:20
**/
public class FutureDemo {
public static void main(String[] args) {
try {
String s = CompletableFuture.supplyAsync(() -> {
return "hello";
}).get();
System.out.println(s);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
2、等待线程
/**
* @Author: Liu Yue
* @Descripition:
* @Date; Create in 2021/6/10 11:25
**/
public class ComletableFutureDemo {
private static Random rand = new Random();
private static long t = System.currentTimeMillis();
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
System.out.println("begin to start compute");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("end to start compute. passed " + (System.currentTimeMillis() - t)/1000 + " seconds");
return rand.nextInt(1000);
});
Future<Integer> f = future.whenComplete((v, e)->{
System.out.println(v);
System.out.println(e);
});
try {
System.out.println(f.get());
System.in.read();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}catch (IOException e) {
e.printStackTrace();
}
}
}
3、这一组方法接受一个Function作为参数,这个Function的输入是当前的CompletableFuture的计算值,返回结果将是一个新的CompletableFuture,这个新的CompletableFuture会组合原来的CompletableFuture和函数返回的CompletableFuture
/**
* @Author: Liu Yue
* @Descripition:
* @Date; Create in 2021/6/10 11:52
**/
public class ComletableFutureDemo2 {
public static void main(String[] args) {
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<String> future =
integerCompletableFuture.thenCompose(i->{
return CompletableFuture.supplyAsync(() -> {
return (i * 10) + "";
});
});
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}