前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >看完,你也能用多线程让接口提升5倍速!

看完,你也能用多线程让接口提升5倍速!

作者头像
林老师带你学编程
发布2021-12-07 19:15:29
5130
发布2021-12-07 19:15:29
举报
文章被收录于专栏:强仔仔强仔仔

一、应用场景

酒店提供给各个渠道商房间价格是不一样的,我们需要轮询所有的渠道商接口,给用户返回一个最低的价格,前端会将这个价格显示给用户。

二、接口要求

实时查询去哪儿、携程、飞猪、艺龙、同程等渠道的今日房价,计算并返回当日最低价。

三、接口难点

去哪儿、携程、飞猪、艺龙、同程因为是走外网,网络会有一定的延迟。如果同步一个个获取价格数据,接口会很慢。

四、实现

4.1 简单实现

循环调用各个渠道今日房价,对比每个渠道价格,计算并返回最低价格。

代码语言:javascript
复制
package com.fourkmiles.common.thread;

import java.util.ArrayList;
import java.util.List;

/**
 * @author 林志强
 * @date 2021/3/31
 */
public class MainDemo {
    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        List<String> channelList = new ArrayList<>();
        channelList.add("去哪儿");
        channelList.add("携程");
        channelList.add("飞猪");
        channelList.add("艺龙");
        channelList.add("同程");
        // 初始值
        int minPrice = -1;
        String minPriceChannel = "";
        for (String channel : channelList) {
            CallableChannelPrice callableChannelPrice = new CallableChannelPrice(channel);
            int channelPrice = callableChannelPrice.getPrice();
            // 将第一个值赋值给minPrice或者后者价格小于当前价格
            if (minPrice == -1 || (minPrice != -1 && minPrice > channelPrice)) {
                minPrice = channelPrice;
                minPriceChannel = channel;
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("最低的渠道为:" + minPriceChannel + ",价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
    }

    static class CallableChannelPrice {
        String channel;

        public CallableChannelPrice(String channel) {
            this.channel = channel;
        }

        public int getPrice() throws Exception {
            int price = 0;
            // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
            if ("去哪儿".equals(channel)) {
                // 模拟请求去哪儿接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 150;
            } else if ("携程".equals(channel)) {
                // 模拟请求携程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 141;
            } else if ("飞猪".equals(channel)) {
                // 模拟请求飞猪接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 122;
            } else if ("艺龙".equals(channel)) {
                // 模拟请求艺龙接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 138;
            } else if ("同程".equals(channel)) {
                // 模拟请求同程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 195;
            }
            System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
            return price;
        }
    }
}
代码语言:javascript
复制
获取:去哪儿渠道的价格,价格为:150
获取:携程渠道的价格,价格为:141
获取:飞猪渠道的价格,价格为:122
获取:艺龙渠道的价格,价格为:138
获取:同程渠道的价格,价格为:195
最低的渠道为:飞猪,价格为:122,执行时间为:25003

循环调用有一个很明显的缺点,就是速度非常慢,接口响应的速度等于各个渠道接口的总和,这明显不符合我们的要求。

4.2 多线程Future/FutureTask实现

解决这类问题最有效的办法就是采用多线程并发执行,然后获取各自结果集来计算最终的价格。我们接下来看看Future和FutureTask的实现案例,以及它到底可以节省多少时间呢。

代码语言:javascript
复制
package com.fourkmiles.common.thread;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @author 林志强
 * @date 2021/3/31
 */
public class FutureDemo {

    public static void main(String[] args) {
        //开启多线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        long startTime = System.currentTimeMillis();
        List<String> channelList = new ArrayList<>();
        channelList.add("去哪儿");
        channelList.add("携程");
        channelList.add("飞猪");
        channelList.add("艺龙");
        channelList.add("同程");
        // 初始值
        int minPrice = -1;
        try {
            //结果集
            List<Integer> list = new ArrayList<>();
            List<Future<Integer>> futureList = new ArrayList<>();
            //1.高速提交5个任务,每个任务返回一个Future入list
            for (String channel : channelList) {
                futureList.add(exs.submit(new CallableChannelPrice(channel)));
            }
            //2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除
            while (futureList.size() > 0) {
                Iterator<Future<Integer>> iterable = futureList.iterator();
                //遍历一遍
                while (iterable.hasNext()) {
                    Future<Integer> future = iterable.next();
                    //如果任务完成取结果,否则判断下一个任务是否完成
                    if (future.isDone() && !future.isCancelled()) {
                        //获取结果
                        Integer i = future.get();
                        list.add(i);
                        //任务完成移除任务
                        iterable.remove();
                    }
                }
            }
            for (Integer price : list) {
                // 将第一个值赋值给minPrice或者后者价格小于当前价格
                if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
                    minPrice = price;
                }
            }
            long endTime = System.currentTimeMillis();
            System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }

    static class CallableChannelPrice implements Callable<Integer> {
        String channel;

        public CallableChannelPrice(String channel) {
            this.channel = channel;
        }

        public int getPrice() throws Exception {
            int price = 0;
            // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
            if ("去哪儿".equals(channel)) {
                // 模拟请求去哪儿接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 150;
            } else if ("携程".equals(channel)) {
                // 模拟请求携程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 141;
            } else if ("飞猪".equals(channel)) {
                // 模拟请求飞猪接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 122;
            } else if ("艺龙".equals(channel)) {
                // 模拟请求艺龙接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 138;
            } else if ("同程".equals(channel)) {
                // 模拟请求同程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 195;
            }
            System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
            return price;
        }

        @Override
        public Integer call() throws Exception {
            return getPrice();
        }
    }
}
代码语言:javascript
复制
获取:同程渠道的价格,价格为:195
获取:飞猪渠道的价格,价格为:122
获取:去哪儿渠道的价格,价格为:150
获取:携程渠道的价格,价格为:141
获取:艺龙渠道的价格,价格为:138
最低渠道的价格为:122,执行时间为:5004

FutureTask实现的版本如下所示:

代码语言:javascript
复制
package com.fourkmiles.common.thread;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 林志强
 * @date 2021/3/31
 */
public class FutureTaskDemo {

    public static void main(String[] args) {
        //开启多线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        long startTime = System.currentTimeMillis();
        List<String> channelList = new ArrayList<>();
        channelList.add("去哪儿");
        channelList.add("携程");
        channelList.add("飞猪");
        channelList.add("艺龙");
        channelList.add("同程");
        // 初始值
        int minPrice = -1;
        try {
            //结果集
            List<Integer> list = new ArrayList<>();
            List<FutureTask<Integer>> futureList = new ArrayList<>();
            //1.高速提交5个任务,每个任务返回一个Future入list
            for (String channel : channelList) {
                FutureTask<Integer> futureTask = new FutureTask<>((new CallableChannelPrice(channel)));
                //提交任务,添加返回,Runnable特性
                exs.submit(futureTask);
                //Future特性
                futureList.add(futureTask);
            }
            //2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除
            while (futureList.size() > 0) {
                Iterator<FutureTask<Integer>> iterable = futureList.iterator();
                //遍历一遍
                while (iterable.hasNext()) {
                    Future<Integer> future = iterable.next();
                    //如果任务完成取结果,否则判断下一个任务是否完成
                    if (future.isDone() && !future.isCancelled()) {
                        //获取结果
                        Integer i = future.get();
                        list.add(i);
                        //任务完成移除任务
                        iterable.remove();
                    }
                }
            }
            for (Integer price : list) {
                // 将第一个值赋值给minPrice或者后者价格小于当前价格
                if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
                    minPrice = price;
                }
            }
            long endTime = System.currentTimeMillis();
            System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }

    static class CallableChannelPrice implements Callable<Integer> {
        String channel;

        public CallableChannelPrice(String channel) {
            this.channel = channel;
        }

        public int getPrice() throws Exception {
            int price = 0;
            // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
            if ("去哪儿".equals(channel)) {
                // 模拟请求去哪儿接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 150;
            } else if ("携程".equals(channel)) {
                // 模拟请求携程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 141;
            } else if ("飞猪".equals(channel)) {
                // 模拟请求飞猪接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 122;
            } else if ("艺龙".equals(channel)) {
                // 模拟请求艺龙接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 138;
            } else if ("同程".equals(channel)) {
                // 模拟请求同程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 195;
            }
            System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
            return price;
        }

        @Override
        public Integer call() throws Exception {
            return getPrice();
        }
    }
}

从执行结果上面我们可以看到,采用多线程并发执行方式,执行时间取决于最长的渠道接口。从接口响应的时间来看,效果还是非常显著的,直接从25s降低为5s。

多线程虽然解决网络问题,速度大幅度提升,但是CPU高速轮询,耗资源,代码也不够简洁。我们接着来看看CompletionService的实现。

4.3 多线程CompletionService实现

代码语言:javascript
复制
package com.fourkmiles.common.thread;


import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 林志强
 * @date 2021/3/31
 */
public class CompletionServiceDemo {

    public static void main(String[] args) {
        //开启3个线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            long startTime = System.currentTimeMillis();
            List<String> channelList = new ArrayList<>();
            channelList.add("去哪儿");
            channelList.add("携程");
            channelList.add("飞猪");
            channelList.add("艺龙");
            channelList.add("同程");
            // 初始值
            int minPrice = -1;
            //结果集
            List<Integer> list = new ArrayList<>();
            //1.定义CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
            List<Future<Integer>> futureList = new ArrayList<>();
            //2.添加任务
            for (String channel : channelList) {
                futureList.add(completionService.submit(new CallableChannelPrice(channel)));
            }
            //==================结果归集===================
            //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
            for (Future<Integer> future : futureList) {
                //线程在这里阻塞等待该任务执行完毕,按照
                Integer result = future.get();
                list.add(result);
            }
           //方法2.使用内部阻塞队列的take()
//            for (int i = 0; i < channelList.size(); i++) {
//                //采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
//                Integer result = completionService.take().get();
//                list.add(result);
//            }
            for (Integer price : list) {
                // 将第一个值赋值给minPrice或者后者价格大于当前价格
                if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
                    minPrice = price;
                }
            }
            long endTime = System.currentTimeMillis();
            System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            exs.shutdown();
        }
    }

    static class CallableChannelPrice implements Callable<Integer> {
        String channel;

        public CallableChannelPrice(String channel) {
            this.channel = channel;
        }

        public int getPrice() throws Exception {
            int price = 0;
            // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
            if ("去哪儿".equals(channel)) {
                // 模拟请求去哪儿接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 150;
            } else if ("携程".equals(channel)) {
                // 模拟请求携程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 141;
            } else if ("飞猪".equals(channel)) {
                // 模拟请求飞猪接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 122;
            } else if ("艺龙".equals(channel)) {
                // 模拟请求艺龙接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 138;
            } else if ("同程".equals(channel)) {
                // 模拟请求同程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 195;
            }
            System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
            return price;
        }

        @Override
        public Integer call() throws Exception {
            return getPrice();
        }
    }
}
代码语言:javascript
复制
获取:飞猪渠道的价格,价格为:122
获取:去哪儿渠道的价格,价格为:150
获取:携程渠道的价格,价格为:141
获取:艺龙渠道的价格,价格为:138
获取:同程渠道的价格,价格为:195
最低渠道的价格为:122,执行时间为:5003

CompletableFuture解决了Future/FutureTask版本中CPU高速轮询,耗资源问题,在JDK1.8之前推荐使用,但是提供API不够丰富,没办法框架自己捕捉异常。

4.4 多线程CompletableFuture实现

代码语言:javascript
复制
package com.fourkmiles.common.thread;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * @author 林志强
 * @date 2021/3/31
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 添加顺序结果集
        List<Integer> addSequenceList;
        // 结果顺序结果集
        List<Integer> executionSequenceList = new ArrayList<>();
        //定长5线程池
        ExecutorService exs = Executors.newFixedThreadPool(5);
        long startTime = System.currentTimeMillis();
        List<String> channelList = new ArrayList<>();
        channelList.add("去哪儿");
        channelList.add("携程");
        channelList.add("飞猪");
        channelList.add("艺龙");
        channelList.add("同程");
        // 初始值
        int minPrice = -1;
        List<CompletableFuture<Integer>> futureList = new ArrayList<>();
        try {
            //方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取
            for (int i = 0; i < channelList.size(); i++) {
                final int j = i;
                //异步执行
                CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getPrice(channelList.get(j)), exs)
                        //Integer转换字符串    thenAccept只接受不返回不影响结果
                        .thenApply(e -> Integer.valueOf(e))
                        //如需获取任务完成先后顺序,此处代码即可
                        .whenComplete((v, e) -> {
                            executionSequenceList.add(v);
                        });
                futureList.add(future);
            }
            //流式获取结果:此处是根据任务添加顺序获取的结果
            addSequenceList = sequence(futureList).get();

//            方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
            CompletableFuture[] cfs = channelList.stream().map(object -> CompletableFuture.supplyAsync(() -> getPrice(object), exs)
                    .thenApply(h -> Integer.valueOf(h))
                    //如需获取任务完成先后顺序,此处代码即可
                    .whenComplete((v, e) -> {
                        executionSequenceList.add(v);
                    })).toArray(CompletableFuture[]::new);
//            等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
            CompletableFuture.allOf(cfs).join();
            for (Integer price : addSequenceList) {
                // 将第一个值赋值给minPrice或者后者价格小于当前价格
                if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
                    minPrice = price;
                }
            }
            long endTime = System.currentTimeMillis();
            System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }

    public static int getPrice(String channel) {
        int price = 0;
        try {
            // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
            if ("去哪儿".equals(channel)) {
                // 模拟请求去哪儿接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 150;
            } else if ("携程".equals(channel)) {
                // 模拟请求携程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 141;
            } else if ("飞猪".equals(channel)) {
                // 模拟请求飞猪接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 122;
            } else if ("艺龙".equals(channel)) {
                // 模拟请求艺龙接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 138;
            } else if ("同程".equals(channel)) {
                // 模拟请求同程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 195;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
        return price;
    }

    /**
     * @param futures List
     * @return
     * @Description 组合多个CompletableFuture为一个CompletableFuture, 所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get.
     * @author diandian.zhang
     * @date 2017年6月19日下午3:01:09
     * @since JDK1.8
     */
    public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
        //1.构造一个空CompletableFuture,子任务数为入参任务list size
        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        //2.流式(总任务完成后,每个子任务join取结果,后转换为list)
        return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }

    /**
     * @param futures Stream
     * @return
     * @Description Stream流式类型futures转换成一个CompletableFuture, 所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get.
     * @author diandian.zhang
     * @date 2017年6月19日下午6:23:40
     * @since JDK1.8
     */
    public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
        List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
        return sequence(futureList);
    }
}
代码语言:javascript
复制
获取:去哪儿渠道的价格,价格为:150
获取:飞猪渠道的价格,价格为:122
获取:同程渠道的价格,价格为:195
获取:艺龙渠道的价格,价格为:138
获取:携程渠道的价格,价格为:141
最低渠道的价格为:122,执行时间为:5049

Java8流式编程,提供丰富的API接口,可以捕获异常,JDK8必选方案,相对于上面几种方案优势明显,强烈推荐使用。

CompletableFuture上面demo只介绍了一部分的内容,如果大家想要了解更多关于CompletableFuture的用法,可以查看JDK中java.util.concurrent.CompletableFuture源码。

五、总结:

从上面的实验我们可以得出,采用CompletableFuture进行多线程并发获取结果集是最好的方案。大家可能会很好奇,既然有了CompletableFuture,为啥还需要前面几种方案呢。那是因为CompletableFuture是在JDK8才出现的,以前还没有这个工具。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-04-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档