多线程一直Java开发中的难点,也是面试中的常客,趁着还有时间,打算巩固一下JUC方面知识,我想机会随处可见,但始终都是留给有准备的人的,希望我们都能加油!!!
沉下去,再浮上来
,我想我们会变的不一样的
来自朋友圈
作者:徐四喜
CompletableFuture
在Java中CompletableFuture
用于异步编程,异步通常意味着非阻塞,可以使我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
在这种方式中,主线程不会被阻塞,因为子线程是另外一条线程在执行,所以不需要一直等到子线程完成。主线程就可以并行的执行其他任务。这种并行方式,可以极大的提供程序性能。
CompletableFuture
实现了 Future
, CompletionStage
接口。
Future
接口CompletableFuture
就可以兼容现在有线程池框架;CompletionStage
接口是异步编程的接口抽象,里面定义多种异步方法,实现了CompletionStage
多种抽象方法和Future
并与一起使用,从而才打造出了强大的CompletableFuture 类。CompletableFuture
是 Future
API的扩展。
Future表示异步计算的结果。 提供了检查计算是否完成、等待计算完成以及检索计算结果的方法。 结果只能在计算完成后使用get方法检索,必要时阻塞,直到它准备好。 取消由cancel方法执行。 提供了其他方法来确定任务是正常完成还是被取消。 一旦计算完成,就不能取消计算。
Future 的主要缺点如下: (1)不支持手动完成
(2)Future 的结果在非阻塞的情况下,不能执行更进一步的操作
Future
不会通知你它已经完成了,它提供了一个阻塞的 get()
方法通知你结果。你无法给 Future 植入一个回调函数,当 Future
结果可用的时候,用该回调函数自动的调用 Future 的结果。(3)不能够支持链式调用
Future
的执行结果,我们想继续传到下一个 Future
处理使用,从而形成一个链式的调用,这在 Future 中是没法实现的。(4)不支持多个 Future 合并
Futrue
计算 10的平方,另一个Futrue计算100的平方,我没有办法直接将他们合起来。(5)不支持异常处理
Future
的 API 没有任何的异常处理的 api,所以运行时,很有可能无法定位到错误。
Future API:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning); //尝试取消此任务的执行。
boolean isCancelled();//如果此任务在正常完成之前被取消,则返回true
boolean isDone(); //如果此任务完成,则返回true 。 完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true
V get() throws InterruptedException, ExecutionException; //获得任务计算结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;//可等待多少时间去获得任务计算结果
}
CompletableFuture
提供了四个静态方法用来创建CompletableFuture对象:
//runAsync 返回void 函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
//supplyAsync 异步返回一个结果 函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()
//Supplier 是一个函数式接口,代表是一个生成者的意思
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
场景:主线程里面创建一个 CompletableFuture
,然后主线程调用 get 方法会 阻塞,最后我们在一个子线程中使其终止。
/**
* @Author: crush
* @Date: 2021-08-23 9:08
* version 1.0
*/
public class CompletableFutureDemo1 {
/**
* 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止
*
* @param args
*/
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "子线程开始干活");
//子线程睡 5 秒
Thread.sleep(5000);
// //在子线程中完成主线程 如果注释掉这一行代码将会一直停住
future.complete("success");
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
//主线程调用 get 方法阻塞
System.out.println("主线程调用 get 方法获取结果为: " + future.get());
System.out.println("主线程完成,阻塞结束!!!!!!");
}
}
runAsync:返回一个新的 CompletableFuture
,它在运行给定操作后由在ForkJoinPool.commonPool()运行的任务异步完成。
如果你想异步的运行一个后台任务并且不需要任务返回结果,就可以使用runAsync
/**
* @Author: crush
* @Date: 2021-08-23 9:08
* version 1.0
*/
public class CompletableFutureDemo2 {
/**
* 没有返回值的异步任务
*
* @param args
*/
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
System.out.println("子线程启动干活");
Thread.sleep(5000);
System.out.println("子线程完成");
} catch (Exception e) {
e.printStackTrace();
}
});
//主线程阻塞
future.get();
System.out.println("主线程结束");
}
}
supplyAsync:返回任务结果。
CompletableFuture.supplyAsync()
它持有supplier<T>
并且返回CompletableFuture<T>
,T
是通过调用 传入的supplier取得的值的类型。
Supplier<T>
是一个简单的函数式接口,表示supplier的结果。它有一个get()
方法,该方法可以写入你的后台任务中,并且返回结果。
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
/**
* @Author: crush
* @Date: 2021-08-23 9:08
* version 1.0
*/
public class CompletableFutureDemo2 {
/**
* 有返回值的异步任务
*
* @param args
*/
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("子线程启动干活");
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
return "子线程任务完成";
});
//主线程阻塞
System.out.println(future.get());
System.out.println("主线程结束");
}
}
/**
* 主线程开始
* 子线程启动干活
* 子线程任务完成
* 主线程结束
*/
当一个线程依赖另一个线程时,可以使用 thenApply
方法来把这两个线程串行化。
/**
* @Author: crush
* @Date: 2021-08-23 9:08
* version 1.0
*/
public class CompletableFutureDemo4 {
private static String action="";
/**
* 线程依赖
* 1、我到了烧烤店,
* 2、开始点烧烤
* 3、和朋友次完烧烤 ,给女朋友带奶茶回去
* @param args
*/
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture<String> future =
CompletableFuture.supplyAsync(() -> {
action="和朋友一起去次烧烤 ";
return action;
}).thenApply(string -> {
return action+"开始点烧烤!!";
}).thenApply(String->{
return action+"和朋友次完烧烤,给女朋友带杯奶茶回去!!";
});
String str = future.get();
System.out.println("主线程结束, 子线程的结果为:" + str);
}
}
/**
* 主线程开始
* 主线程结束, 子线程的结果为:和朋友一起去次烧烤 和朋友次完烧烤,给女朋友带杯奶茶回去!!
*/
如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept()
和 thenRun()
方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。
thenAccept
消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
/**
* @Author: crush
* @Date: 2021-08-23 9:08
* version 1.0
*/
public class CompletableFutureDemo5 {
private static String action = "";
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture.supplyAsync(() -> {
try {
action = "逛淘宝,想买双鞋 ";
} catch (Exception e) {
e.printStackTrace();
}
return action;
}).thenApply(string -> {
return action + "选中了,下单成功!!";
}).thenApply(String -> {
return action + "等待快递到来";
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + s);
}
});
}
}
/**
主线程开始
子线程全部处理完成,最后调用了 accept,结果为:逛淘宝,想买双鞋 等待快递到来
*/
exceptionally
异常处理,出现异常时触发,可以回调给你一个从原始Future
中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。
/**
* @Author: crush
* @Date: 2021-08-23 9:08
* version 1.0
*/
public class CompletableFutureDemo6 {
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i= 1/0;
System.out.println("子线程执行中");
return i;
}).exceptionally(ex -> {
System.out.println(ex.getMessage());
return -1;
});
System.out.println(future.get());
}
}
/**
* 主线程开始
* java.lang.ArithmeticException: / by zero
* -1
*/
使用 handle() 方法处理异常
API提供了一个更通用的方法 - handle()
从异常恢复,无论一个异常是否发生它都会被调用
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务开始");
int i=0/1;
return i;
}).handle((i,ex) -> {
System.out.println("进入 handle 方法");
if (ex != null) {
System.out.println("发生了异常,内容为:" + ex.getMessage());
return -1;
} else {
System.out.println("正常完成,内容为: " + i);
return i;
}
});
thenCompose
合并两个有依赖关系的 CompletableFutures
的执行结果
/**
* @Author: crush
* @Date: 2021-08-23 9:08
* version 1.0
*/
public class CompletableFutureDemo7 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
//第一步加 10
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("让num+10;任务开始");
num += 10;
return num;
});
//合并
CompletableFuture<Integer> future1 = future.thenCompose(i ->
//再来一个 CompletableFuture
CompletableFuture.supplyAsync(() -> {
return i + 1;
}));
System.out.println(future.get());
System.out.println(future1.get());
}
}
/**
* 主线程开始
* 让num+10;任务开始
* 20
* 21
*/
thenCombine
合并两个没有依赖关系的 CompletableFutures
任务
package com.crush.juc09;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
/**
* @Author: crush
* @Date: 2021-08-23 9:08
* version 1.0
*/
public class CompletableFutureDemo8 {
private static Integer sum = 0;
private static Integer count = 1;
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("从1+...+50开始");
for (int i=1;i<=50;i++){
sum+=i;
}
System.out.println("sum::"+sum);
return sum;
});
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("从1*...*10开始");
for (int i=1;i<=10;i++){
count=count*i;
}
System.out.println("count::"+count);
return count;
});
//合并两个结果
CompletableFuture<Object> future = job1.thenCombine(job2, new
BiFunction<Integer, Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer a, Integer b) {
List<Integer> list = new ArrayList<>();
list.add(a);
list.add(b);
return list;
}
});
System.out.println("合并结果为:" + future.get());
}
}
/**
主线程开始
从1*...*10开始
从1+...+50开始
sum::1275
count::3628800
合并结果为:[1275, 3628800]
*/
allOf 与 anyOf
allOf
: 一系列独立的 future
任务,等其所有的任务执行完后做一些事情
/**
* @Author: crush
* @Date: 2021-08-23 9:08
* version 1.0
*/
public class CompletableFutureDemo9 {
private static Integer num = 10;
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
List<CompletableFuture> list = new ArrayList<>();
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("加 10 任务开始");
num += 10;
return num;
});
list.add(job1);
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("乘以 10 任务开始");
num = num * 10;
return num;
});
list.add(job2);
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
System.out.println("减以 10 任务开始");
num = num - 10;
return num;
});
list.add(job3);
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
System.out.println("除以 10 任务开始");
num = num / 10;
return num;
});
list.add(job4);
//多任务合并
List<Integer> collect =
list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
System.out.println(collect);
}
}
/**主线程开始
乘以 10 任务开始
加 10 任务开始
减以 10 任务开始
除以 10 任务开始
[110, 100, 100, 10]
*/
anyOf
: 只要在多个 future
里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束
package com.crush.juc09;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* @Author: crush
* @Date: 2021-08-23 9:08
* version 1.0
*/
public class CompletableFutureDemo10 {
private static Integer num = 10;
/**
* 先对一个数加 10,然后取平方
* @param args
*/
public static void main(String[] args) throws Exception{
System.out.println("主线程开始");
CompletableFuture<Integer>[] futures = new CompletableFuture[4];
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(5000);
System.out.println("加 10 任务开始");
num += 10;
return num;
}catch (Exception e){
return 0;
}
});
futures[0] = job1;
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(2000);
System.out.println("乘以 10 任务开始");
num = num * 10;
return num;
}catch (Exception e){
return 1;
}
});
futures[1] = job2;
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(3000);
System.out.println("减以 10 任务开始");
num = num - 10;
return num;
}catch (Exception e){
return 2;
}
});
futures[2] = job3;
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
try{
Thread.sleep(4000);
System.out.println("除以 10 任务开始");
num = num / 10;
return num;
}catch (Exception e){
return 3;
}
});
futures[3] = job4;
CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
System.out.println(future.get());
}
}
//主线程开始
//乘以 10 任务开始
//100
本文只是做了一点简单介绍,还需要大家更深入的了解。
最近又开始了JUC的学习,感觉Java内容真的很多,但是为了能够走的更远,还是觉得应该需要打牢一下基础。
最近在持续更新中,如果你觉得对你有所帮助,也感兴趣的话,关注我吧,让我们
一起学习,一起讨论吧。
你好,我是博主宁在春
,Java学习路上的一颗小小的种子,也希望有一天能扎根长成苍天大树。
希望与君共勉
😁
我们:待别时相见时,都已有所成。