一、应用场景
酒店提供给各个渠道商房间价格是不一样的,我们需要轮询所有的渠道商接口,给用户返回一个最低的价格,前端会将这个价格显示给用户。
二、接口要求
实时查询去哪儿、携程、飞猪、艺龙、同程等渠道的今日房价,计算并返回当日最低价。
三、接口难点
去哪儿、携程、飞猪、艺龙、同程因为是走外网,网络会有一定的延迟。如果同步一个个获取价格数据,接口会很慢。
四、实现
4.1 简单实现
循环调用各个渠道今日房价,对比每个渠道价格,计算并返回最低价格。
package com.fourkmiles.common.thread;
import java.util.ArrayList;
import java.util.List;
/**
* @author 林志强
* @date 2021/3/31
*/
public class MainDemo {
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
List<String> channelList = new ArrayList<>();
channelList.add("去哪儿");
channelList.add("携程");
channelList.add("飞猪");
channelList.add("艺龙");
channelList.add("同程");
// 初始值
int minPrice = -1;
String minPriceChannel = "";
for (String channel : channelList) {
CallableChannelPrice callableChannelPrice = new CallableChannelPrice(channel);
int channelPrice = callableChannelPrice.getPrice();
// 将第一个值赋值给minPrice或者后者价格小于当前价格
if (minPrice == -1 || (minPrice != -1 && minPrice > channelPrice)) {
minPrice = channelPrice;
minPriceChannel = channel;
}
}
long endTime = System.currentTimeMillis();
System.out.println("最低的渠道为:" + minPriceChannel + ",价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
}
static class CallableChannelPrice {
String channel;
public CallableChannelPrice(String channel) {
this.channel = channel;
}
public int getPrice() throws Exception {
int price = 0;
// 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
if ("去哪儿".equals(channel)) {
// 模拟请求去哪儿接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 150;
} else if ("携程".equals(channel)) {
// 模拟请求携程接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 141;
} else if ("飞猪".equals(channel)) {
// 模拟请求飞猪接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 122;
} else if ("艺龙".equals(channel)) {
// 模拟请求艺龙接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 138;
} else if ("同程".equals(channel)) {
// 模拟请求同程接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 195;
}
System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
return price;
}
}
}
获取:去哪儿渠道的价格,价格为:150
获取:携程渠道的价格,价格为:141
获取:飞猪渠道的价格,价格为:122
获取:艺龙渠道的价格,价格为:138
获取:同程渠道的价格,价格为:195
最低的渠道为:飞猪,价格为:122,执行时间为:25003
循环调用有一个很明显的缺点,就是速度非常慢,接口响应的速度等于各个渠道接口的总和,这明显不符合我们的要求。
4.2 多线程Future/FutureTask实现
解决这类问题最有效的办法就是采用多线程并发执行,然后获取各自结果集来计算最终的价格。我们接下来看看Future和FutureTask的实现案例,以及它到底可以节省多少时间呢。
package com.fourkmiles.common.thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author 林志强
* @date 2021/3/31
*/
public class FutureDemo {
public static void main(String[] args) {
//开启多线程
ExecutorService exs = Executors.newFixedThreadPool(5);
long startTime = System.currentTimeMillis();
List<String> channelList = new ArrayList<>();
channelList.add("去哪儿");
channelList.add("携程");
channelList.add("飞猪");
channelList.add("艺龙");
channelList.add("同程");
// 初始值
int minPrice = -1;
try {
//结果集
List<Integer> list = new ArrayList<>();
List<Future<Integer>> futureList = new ArrayList<>();
//1.高速提交5个任务,每个任务返回一个Future入list
for (String channel : channelList) {
futureList.add(exs.submit(new CallableChannelPrice(channel)));
}
//2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除
while (futureList.size() > 0) {
Iterator<Future<Integer>> iterable = futureList.iterator();
//遍历一遍
while (iterable.hasNext()) {
Future<Integer> future = iterable.next();
//如果任务完成取结果,否则判断下一个任务是否完成
if (future.isDone() && !future.isCancelled()) {
//获取结果
Integer i = future.get();
list.add(i);
//任务完成移除任务
iterable.remove();
}
}
}
for (Integer price : list) {
// 将第一个值赋值给minPrice或者后者价格小于当前价格
if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
minPrice = price;
}
}
long endTime = System.currentTimeMillis();
System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
} catch (Exception e) {
e.printStackTrace();
} finally {
exs.shutdown();
}
}
static class CallableChannelPrice implements Callable<Integer> {
String channel;
public CallableChannelPrice(String channel) {
this.channel = channel;
}
public int getPrice() throws Exception {
int price = 0;
// 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
if ("去哪儿".equals(channel)) {
// 模拟请求去哪儿接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 150;
} else if ("携程".equals(channel)) {
// 模拟请求携程接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 141;
} else if ("飞猪".equals(channel)) {
// 模拟请求飞猪接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 122;
} else if ("艺龙".equals(channel)) {
// 模拟请求艺龙接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 138;
} else if ("同程".equals(channel)) {
// 模拟请求同程接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 195;
}
System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
return price;
}
@Override
public Integer call() throws Exception {
return getPrice();
}
}
}
获取:同程渠道的价格,价格为:195
获取:飞猪渠道的价格,价格为:122
获取:去哪儿渠道的价格,价格为:150
获取:携程渠道的价格,价格为:141
获取:艺龙渠道的价格,价格为:138
最低渠道的价格为:122,执行时间为:5004
FutureTask实现的版本如下所示:
package com.fourkmiles.common.thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
/**
* @author 林志强
* @date 2021/3/31
*/
public class FutureTaskDemo {
public static void main(String[] args) {
//开启多线程
ExecutorService exs = Executors.newFixedThreadPool(5);
long startTime = System.currentTimeMillis();
List<String> channelList = new ArrayList<>();
channelList.add("去哪儿");
channelList.add("携程");
channelList.add("飞猪");
channelList.add("艺龙");
channelList.add("同程");
// 初始值
int minPrice = -1;
try {
//结果集
List<Integer> list = new ArrayList<>();
List<FutureTask<Integer>> futureList = new ArrayList<>();
//1.高速提交5个任务,每个任务返回一个Future入list
for (String channel : channelList) {
FutureTask<Integer> futureTask = new FutureTask<>((new CallableChannelPrice(channel)));
//提交任务,添加返回,Runnable特性
exs.submit(futureTask);
//Future特性
futureList.add(futureTask);
}
//2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除
while (futureList.size() > 0) {
Iterator<FutureTask<Integer>> iterable = futureList.iterator();
//遍历一遍
while (iterable.hasNext()) {
Future<Integer> future = iterable.next();
//如果任务完成取结果,否则判断下一个任务是否完成
if (future.isDone() && !future.isCancelled()) {
//获取结果
Integer i = future.get();
list.add(i);
//任务完成移除任务
iterable.remove();
}
}
}
for (Integer price : list) {
// 将第一个值赋值给minPrice或者后者价格小于当前价格
if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
minPrice = price;
}
}
long endTime = System.currentTimeMillis();
System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
} catch (Exception e) {
e.printStackTrace();
} finally {
exs.shutdown();
}
}
static class CallableChannelPrice implements Callable<Integer> {
String channel;
public CallableChannelPrice(String channel) {
this.channel = channel;
}
public int getPrice() throws Exception {
int price = 0;
// 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
if ("去哪儿".equals(channel)) {
// 模拟请求去哪儿接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 150;
} else if ("携程".equals(channel)) {
// 模拟请求携程接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 141;
} else if ("飞猪".equals(channel)) {
// 模拟请求飞猪接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 122;
} else if ("艺龙".equals(channel)) {
// 模拟请求艺龙接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 138;
} else if ("同程".equals(channel)) {
// 模拟请求同程接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 195;
}
System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
return price;
}
@Override
public Integer call() throws Exception {
return getPrice();
}
}
}
从执行结果上面我们可以看到,采用多线程并发执行方式,执行时间取决于最长的渠道接口。从接口响应的时间来看,效果还是非常显著的,直接从25s降低为5s。
多线程虽然解决网络问题,速度大幅度提升,但是CPU高速轮询,耗资源,代码也不够简洁。我们接着来看看CompletionService的实现。
4.3 多线程CompletionService实现
package com.fourkmiles.common.thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
/**
* @author 林志强
* @date 2021/3/31
*/
public class CompletionServiceDemo {
public static void main(String[] args) {
//开启3个线程
ExecutorService exs = Executors.newFixedThreadPool(5);
try {
long startTime = System.currentTimeMillis();
List<String> channelList = new ArrayList<>();
channelList.add("去哪儿");
channelList.add("携程");
channelList.add("飞猪");
channelList.add("艺龙");
channelList.add("同程");
// 初始值
int minPrice = -1;
//结果集
List<Integer> list = new ArrayList<>();
//1.定义CompletionService
CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
List<Future<Integer>> futureList = new ArrayList<>();
//2.添加任务
for (String channel : channelList) {
futureList.add(completionService.submit(new CallableChannelPrice(channel)));
}
//==================结果归集===================
//方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
for (Future<Integer> future : futureList) {
//线程在这里阻塞等待该任务执行完毕,按照
Integer result = future.get();
list.add(result);
}
//方法2.使用内部阻塞队列的take()
// for (int i = 0; i < channelList.size(); i++) {
// //采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
// Integer result = completionService.take().get();
// list.add(result);
// }
for (Integer price : list) {
// 将第一个值赋值给minPrice或者后者价格大于当前价格
if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
minPrice = price;
}
}
long endTime = System.currentTimeMillis();
System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭线程池
exs.shutdown();
}
}
static class CallableChannelPrice implements Callable<Integer> {
String channel;
public CallableChannelPrice(String channel) {
this.channel = channel;
}
public int getPrice() throws Exception {
int price = 0;
// 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
if ("去哪儿".equals(channel)) {
// 模拟请求去哪儿接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 150;
} else if ("携程".equals(channel)) {
// 模拟请求携程接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 141;
} else if ("飞猪".equals(channel)) {
// 模拟请求飞猪接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 122;
} else if ("艺龙".equals(channel)) {
// 模拟请求艺龙接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 138;
} else if ("同程".equals(channel)) {
// 模拟请求同程接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 195;
}
System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
return price;
}
@Override
public Integer call() throws Exception {
return getPrice();
}
}
}
获取:飞猪渠道的价格,价格为:122
获取:去哪儿渠道的价格,价格为:150
获取:携程渠道的价格,价格为:141
获取:艺龙渠道的价格,价格为:138
获取:同程渠道的价格,价格为:195
最低渠道的价格为:122,执行时间为:5003
CompletableFuture解决了Future/FutureTask版本中CPU高速轮询,耗资源问题,在JDK1.8之前推荐使用,但是提供API不够丰富,没办法框架自己捕捉异常。
4.4 多线程CompletableFuture实现
package com.fourkmiles.common.thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author 林志强
* @date 2021/3/31
*/
public class CompletableFutureDemo {
public static void main(String[] args) {
// 添加顺序结果集
List<Integer> addSequenceList;
// 结果顺序结果集
List<Integer> executionSequenceList = new ArrayList<>();
//定长5线程池
ExecutorService exs = Executors.newFixedThreadPool(5);
long startTime = System.currentTimeMillis();
List<String> channelList = new ArrayList<>();
channelList.add("去哪儿");
channelList.add("携程");
channelList.add("飞猪");
channelList.add("艺龙");
channelList.add("同程");
// 初始值
int minPrice = -1;
List<CompletableFuture<Integer>> futureList = new ArrayList<>();
try {
//方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取
for (int i = 0; i < channelList.size(); i++) {
final int j = i;
//异步执行
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getPrice(channelList.get(j)), exs)
//Integer转换字符串 thenAccept只接受不返回不影响结果
.thenApply(e -> Integer.valueOf(e))
//如需获取任务完成先后顺序,此处代码即可
.whenComplete((v, e) -> {
executionSequenceList.add(v);
});
futureList.add(future);
}
//流式获取结果:此处是根据任务添加顺序获取的结果
addSequenceList = sequence(futureList).get();
// 方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
CompletableFuture[] cfs = channelList.stream().map(object -> CompletableFuture.supplyAsync(() -> getPrice(object), exs)
.thenApply(h -> Integer.valueOf(h))
//如需获取任务完成先后顺序,此处代码即可
.whenComplete((v, e) -> {
executionSequenceList.add(v);
})).toArray(CompletableFuture[]::new);
// 等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
CompletableFuture.allOf(cfs).join();
for (Integer price : addSequenceList) {
// 将第一个值赋值给minPrice或者后者价格小于当前价格
if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
minPrice = price;
}
}
long endTime = System.currentTimeMillis();
System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
} catch (Exception e) {
e.printStackTrace();
} finally {
exs.shutdown();
}
}
public static int getPrice(String channel) {
int price = 0;
try {
// 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
if ("去哪儿".equals(channel)) {
// 模拟请求去哪儿接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 150;
} else if ("携程".equals(channel)) {
// 模拟请求携程接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 141;
} else if ("飞猪".equals(channel)) {
// 模拟请求飞猪接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 122;
} else if ("艺龙".equals(channel)) {
// 模拟请求艺龙接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 138;
} else if ("同程".equals(channel)) {
// 模拟请求同程接口的网络延迟,休眠5秒
Thread.sleep(5000);
price = 195;
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
return price;
}
/**
* @param futures List
* @return
* @Description 组合多个CompletableFuture为一个CompletableFuture, 所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get.
* @author diandian.zhang
* @date 2017年6月19日下午3:01:09
* @since JDK1.8
*/
public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
//1.构造一个空CompletableFuture,子任务数为入参任务list size
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
//2.流式(总任务完成后,每个子任务join取结果,后转换为list)
return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}
/**
* @param futures Stream
* @return
* @Description Stream流式类型futures转换成一个CompletableFuture, 所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get.
* @author diandian.zhang
* @date 2017年6月19日下午6:23:40
* @since JDK1.8
*/
public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
return sequence(futureList);
}
}
获取:去哪儿渠道的价格,价格为:150
获取:飞猪渠道的价格,价格为:122
获取:同程渠道的价格,价格为:195
获取:艺龙渠道的价格,价格为:138
获取:携程渠道的价格,价格为:141
最低渠道的价格为:122,执行时间为:5049
Java8流式编程,提供丰富的API接口,可以捕获异常,JDK8必选方案,相对于上面几种方案优势明显,强烈推荐使用。
CompletableFuture上面demo只介绍了一部分的内容,如果大家想要了解更多关于CompletableFuture的用法,可以查看JDK中java.util.concurrent.CompletableFuture源码。
五、总结:
从上面的实验我们可以得出,采用CompletableFuture进行多线程并发获取结果集是最好的方案。大家可能会很好奇,既然有了CompletableFuture,为啥还需要前面几种方案呢。那是因为CompletableFuture是在JDK8才出现的,以前还没有这个工具。