Future 接口可为主线程开启一个分支任务,专门为主线程处理耗时和废力的复杂业务
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new MyThread());
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(futureTask.get());
}
}
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("-----come in call() ");
return "hello Callable";
}
}
//3个任务,目前只有一个线程main来处理,请问耗时多少?
long startTime = System.currentTimeMillis();
//暂停毫秒
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");
System.out.println(Thread.currentThread().getName() + "\t -----end");
ExecutorService threadPool = Executors.newFixedThreadPool(3);
long startTime = System.currentTimeMillis();
FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task1 over";
});
threadPool.submit(futureTask1);
FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task2 over";
});
threadPool.submit(futureTask2);
System.out.println(futureTask1.get());
System.out.println(futureTask2.get());
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");
System.out.println(Thread.currentThread().getName() + "\t -----end");
threadPool.shutdown();
FutureTask<String> futureTask = new FutureTask<String>(() -> {
System.out.println(Thread.currentThread().getName() + "\t -----come in");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task over";
});
Thread t1 = new Thread(futureTask, "t1");
t1.start();
System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");
System.out.println(futureTask.get());
System.out.println(futureTask.get());
System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");
System.out.println(Thread.currentThread().getName() + "\t ----忙其它任务了");
System.out.println(futureTask.get(3,TimeUnit.SECONDS));
while (true) {
if (futureTask.isDone()) {//futureTask执行完成
System.out.println(futureTask.get());
break;
} else {
//暂停毫秒,未完成,持续等待
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("正在处理中,不要再催了,越催越慢 ,再催熄火");
}
}
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{}
//示例如下
stage
.thenApply(x->square(x))
.thenAccept(x->System.out.print(x))
.thenRun(()->System.out.println())
public static CompletableFuture<Void> runAsync(Runnable runnable)
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(completableFuture.get());
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
},threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello supplyAsync";
});
System.out.println(completableFuture.get());
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello supplyAsync";
},threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "----come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----1秒钟后出结果:" + result);
return result;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
System.out.println(completableFuture.get());
try {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "----come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----1秒钟后出结果:" + result);
return result;
})
.whenComplete((v, e) -> {//v为上述计算完成的result,e为异常
if (e == null) { //没有异常
System.out.println("-----计算完成,更新系统UpdateValue:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
//主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "----come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----1秒钟后出结果:" + result);
return result;
},threadPool).whenComplete((v, e) -> {
if (e == null) {
System.out.println("-----计算完成,更新系统UpdateValue:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
try {
//调用异步任务,传入线程池对象
asyncTask(threadPool);
System.out.println(Thread.currentThread().getName() + "线程先去忙其它任务");
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
//...主线程
void asyncTask(ExecutorService threadPool) {
//...业务的逻辑
return result;
}, threadPool).whenComplete((v, e) -> {
//回调接口
callInterface(v, e);
}).exceptionally(e -> {
//异常接口
e.printStackTrace();
exceptionHandel(e);
return null;
});
}
package java.lang;
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}
@FunctionalInterface
public interface BiConsumer<T, U> {
void accept(T t, U u);
}
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
return "join and get";
});
System.out.println(supplyAsync.join());
案例说明:电商比价需求,模拟如下情况:
1. 需求:
1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
2. 输出:出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>
《mysql》 in jd price is 88.05
《mysql》 in dangdang price is 86.11
《mysql》 in taobao price is 90.43
3. 解决方案,对别同一个商品在各个平台上的价格,要求获得一个清单列表
3.1. step by step,按部就班查完jd,查taobao,查完taobao查天猫.....
3.2. all in ,使用多线程,异步任务执行同时查询多个平台
4. 技术要求
3.1 函数式编程
3.2 链式编程
3.3 Stream流式计算
//电商网站类
class NetMall {
/**
* 电商网站名 jd,pdd taobao...
*/
@Getter
private String netMallName;
/**
* 构造方法
* @param netMallName
*/
public NetMall(String netMallName) {
this.netMallName = netMallName;
}
/**
* 售价
* @param productName
* @return
*/
public double calcPrice(String productName) {
try {
//查询需要1秒钟
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//模拟价格
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("dangdang"),
new NetMall("taobao"),
new NetMall("pdd"),
new NetMall("tmall")
);
/**
* step by step 一家家搜查
* List<NetMall> ----->map------> List<String>
*
* @param list
* @param productName
* @return
*/
public static List<String> getPrice(List<NetMall> list, String productName) {
//《mysql》 in taobao price is 90.43
return list
.stream()//流式计算
.map(netMall -> //映射为map集合
//字符串格式化
String.format(productName + " in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName)))
.collect(Collectors.toList());
}
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> list1 = getPrice(list, "mysql");
for (String element : list1) {
System.out.println(element);
}
long endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒");
}
/**
* List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
*
* @param list
* @param productName
* @return
*/
public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName) {
return list
.stream()
.map(netMall ->
CompletableFuture.supplyAsync(() ->
String.format(productName + " in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(s -> s.join())
.collect(Collectors.toList());
}
public static void main(String[] args) {
long startTime2 = System.currentTimeMillis();
List<String> list2 = getPriceByCompletableFuture(list, "mysql");
for (String element : list2) {
System.out.println(element);
}
long endTime2 = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime2 - startTime2) + " 毫秒");
}
/**
* 获得结果和触发计算
*
* @throws InterruptedException
* @throws ExecutionException
*/
private static void group1() throws InterruptedException, ExecutionException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
System.out.println(completableFuture.get());
}
System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));
System.out.println(completableFuture.join());
System.out.println(completableFuture.getNow("xxx"));
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(completableFuture.getNow("xxx"));
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
//执行2s
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
//执行1s
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.join());
private static void thenApply1(ExecutorService threadPool) {
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("111");
return 1;
}, threadPool).thenApply(f -> {
System.out.println("222");
return f + 2;
}).thenApply(f -> {
System.out.println("333");
return f;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("----计算结果: " + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "----主线程先去忙其它任务");
threadPool.shutdown();
}
private static void handle1(ExecutorService threadPool) {
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("111");
return 1;
}, threadPool).handle((f, e) -> {
System.out.println("222");
return f + 2;
}).handle((f, e) -> {
System.out.println("333");
return f + 3;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("----计算结果: " + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "----主线程先去忙其它任务");
threadPool.shutdown();
}
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(f ->{
return f + 2;
}).thenApply(f ->{
return f + 3;
}).thenAccept(System.out::println);
}
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1号任务" + "\t" + Thread.currentThread().getName());
return "abcd";
}, threadPool).thenRunAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("2号任务" + "\t" + Thread.currentThread().getName());
}).thenRun(() -> {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("3号任务" + "\t" + Thread.currentThread().getName());
}).thenRun(() -> {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("4号任务" + "\t" + Thread.currentThread().getName());
});
System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
public static void main(String[] args) {
//开启两个异步任务
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
System.out.println("A come in");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playA";
});
CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
System.out.println("B come in");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "playB";
});
//比较两个异步任务,返回率先完成计算的异步任务的结果
CompletableFuture<String> result = playA.applyToEither(playB, f -> {
return f + " is winer";
});
System.out.println(Thread.currentThread().getName() + "\t" + "-----: " + result.join());
}
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture1
= CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t ---启动");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> completableFuture2
= CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t ---启动");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> result
= completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println("-----开始两个结果合并");
return x + y;
});
System.out.println(result.join());
}
private static void interfaceChain() {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t"+"come in 1");
return 10;
}).thenCombine(CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + "\t"+"come in 2");
return 20;
}),(x,y)->{
System.out.println(Thread.currentThread().getName() + "\t"+"x + y = a =" +(x+y));
return x + y ;
}).thenCombine(CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + "\t"+"come in 3");
return 30;
}),(a,b)->{
System.out.println(Thread.currentThread().getName() + "\t"+"a + b = " +(a+b));
return a+b;
});
System.out.println("---主线程结束,END");
System.out.println(thenCombineResult.join());
}