前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >并行执行任务

并行执行任务

作者头像
LiosWong
发布2018-10-29 17:45:18
6790
发布2018-10-29 17:45:18
举报
文章被收录于专栏:后端沉思录后端沉思录

需求

在app列表首页,展示多个item,并有分页;而每个item里后台都会调用一个http请求,判断当前item的状态

分析

为了更好的用体验,无疑需要使用多线程并行处理http请求,而且还需要拿到每个线程的执行结果. 上面的分析,有两个问题需要解决: 1. 如何创建线程池 2. 如何拿到所有线程的执行结果 对于第一个问题,还是很好解决的,使用并发包( java.util.concurrent)下面的ThreadPoolExecutor类创建线程池,阿里巴巴Java开发手册上推荐使用该类创建线程池:

,根据统计,该首页的qps最大为3以及服务器的配置后,线程池创建如下:

protected static class ThreadFactory {
        static ThreadPoolExecutor executor = new ThreadPoolExecutor(ThreadNumEnum.CORE_POOL_SIZE.getNum(), ThreadNumEnum.MAX_IMUM_POOL_SIZE.getNum(), TimeOutEnum.FiveSecond.getSeconds(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(),new ThreadPoolExecutor.DiscardPolicy());
        private ThreadFactory() {

        }
        public static ExecutorService getThreadPool() {
            return executor;
        }
    }

如何能拿到线程的执行结果呢,传统的Thread无法拿到执行结果,由于run方法无返回值,通过ThreadPoolExecutor类图发现:

继承了AbstractExecutorService、ExecutorService,对ExecutorService中的invokeAll方法产生极大的兴趣,仔细阅读注释,其实这个方法用来并行执行任务:

/**
     * Executes the given tasks, returning a list of Futures holding
     * their status and results
     * when all complete or the timeout expires, whichever happens first.
     * {@link Future#isDone} is {@code true} for each
     * element of the returned list.
     * Upon return, tasks that have not completed are cancelled.
     * Note that a <em>completed</em> task could have
     * terminated either normally or by throwing an exception.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     *
     * @param tasks the collection of tasks
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @param <T> the type of the values returned from the tasks
     * @return a list of Futures representing the tasks, in the same
     *         sequential order as produced by the iterator for the
     *         given task list. If the operation did not time out,
     *         each task will have completed. If it did time out, some
     *         of these tasks will not have completed.
     * @throws InterruptedException if interrupted while waiting, in
     *         which case unfinished tasks are cancelled
     * @throws NullPointerException if tasks, any of its elements, or
     *         unit are {@code null}
     * @throws RejectedExecutionException if any task cannot be scheduled
     *         for execution
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

到这里,第二个问题的解决思路已经有了.

编码实现

invokeAll方法的入参分别为任务列表list、超时时间、时间单位,所以首先我们需要创建任务列表: List<BasicUserFilter>list=newArrayList<>(); 超时时间为每个FutureTask执行超时时间,这里设置成3s,这里的3s超时时间是针对的所有tasks,而不是单个task的超时时间,如果超时,会取消没有执行完的所有任务,并抛出超时异常,源码如下:

for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }

            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }

BasicUserFilter需要实现Callable接口,因为在方法call里能拿到线程的执行结果, 下面就是并行执行任务了:

ExecutorService executor = ThreadFactory.getThreadPool();
        List<XXX> userFilterDtoList = new ArrayList<>();
        try {
            List<Future<XXX>> futureList = executor.invokeAll(list, TimeOutEnum.FourSecond.getSeconds(), TimeUnit.MILLISECONDS);
            futureList.stream().forEach(p -> {
                try {
                    Future<XXX> filterDtoFuture = p;
                   //拿到线程执行结果  
                   userFilterDtoList.add(filterDtoFuture.get());
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                } catch (ExecutionException e) {
                    logger.error(e.getMessage(), e);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            });
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }

关于多线程笔者还有很多需要去学习,上面是一个工作笔记,关于invokeAll的执行流程、神奇的Future模式,感兴趣的可以阅读源码就能找到答案.

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档