前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ExecutorCompletionService 源码分析

ExecutorCompletionService 源码分析

作者头像
java404
发布2018-05-18 11:59:17
6430
发布2018-05-18 11:59:17
举报
文章被收录于专栏:java 成神之路

概要

在ExecutorService的submit方法中可以获取返回值,通过Future的get方法,但是这个Future类存在缺陷,Future接口调用get()方法取得处理后的返回结果时具有阻塞性,也就是说调用Future的get方法时,任务没有执行完成,则get方法要一直阻塞等到任务完成为止。 这样大大的影响了系统的性能,这就是Future的最大缺点。为此,java1.5以后提供了CompletionServlice来解决这个问题。

CompletionService 接口CompletionService的功能是异步的方式,一边生产任务,一边处理完成的任务结果,这样可以将执行的任务与处理任务隔离开来进行处理,使用submit执行任务,使用塔克获取已完成的任务,并按照这些任务的完成的时间顺序来处理他们的结果。

示例

向ExecutorService 提交一组任务,哪个任务先完成,就把完成任务的返回结果打印出来。

代码语言:javascript
复制
public class CompletionServiceExecutorDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        // 同时运行多个任务,那个任务先返回数据,就先获取该数据
        CompletionService<String> completionService = new ExecutorCompletionService<String>(threadPool);
        for (int i = 1; i <= 10; i++) {
            final int seq = i;
            completionService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    int waitTime = new Random().nextInt(10);
                    TimeUnit.SECONDS.sleep(waitTime);
                    return "callable:"+seq+" 执行时间:"+waitTime+"s";
                }
            });
        }

        for (int i = 1; i <= 10; i++) {
            try {
                Future<String> future = completionService.take();
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        threadPool.shutdown();
    }
}

执行结果如下:

代码语言:javascript
复制
callable:6 执行时间:1s
callable:2 执行时间:3s
callable:10 执行时间:3s
callable:1 执行时间:4s
callable:4 执行时间:5s
callable:8 执行时间:5s
callable:7 执行时间:7s
callable:5 执行时间:8s
callable:9 执行时间:9s
callable:3 执行时间:9s

从打印结果可以看出,这些任务是按照任务执行完成的顺序打印的,先执行完就先返回结果。

ExecutorCompletionService 源码分析

ExecutorCompletionService 类结构如下

代码语言:javascript
复制
public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;  //线程池
    private final AbstractExecutorService aes;  
    private final BlockingQueue<Future<V>> completionQueue;  //任务完成队列


    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

ExecutorCompletionService 类中定义了一个QueueingFuture 的内部类,继承于FutureTask类,内部重写了FutureTask的done方法,该方法是在FutureTask任务执行完成后会调用的方法,在FutureTask中该方法未实现任何逻辑。

重写done方法,在任务处理完成后把该FutureTask任务放入到阻塞队列(BlockingQueue)中,然后我们就可以从阻塞队列中take执行完成的任务,进行想用的处理。

这里是实现ExecutorCompletionService的核心逻辑。

newTaskFor 方法
代码语言:javascript
复制
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

ExecutorCompletionService 支持Callable和Runnable任务

  1. 把用户提交的Callable任务转成FutureTask。
  2. 把用户提交的Runnable任务转成FutureTask。
ExecutorCompletionService 构造方法1
代码语言:javascript
复制
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
  1. 连接池(executor)不能为空。
  2. 判断该线程池是否AbstractExecutorService类型,如果是则赋值给aes,否则赋值null (aes作用:把用户提交的Callable和Runnable任务转换成FutureTask)
  3. 创建一个阻塞队列。(存放执行完成的FutureTask任务)
ExecutorCompletionService 构造方法2
代码语言:javascript
复制
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

该构造可以指定一个阻塞队列,其它功能同上构造方法。

submit方法
代码语言:javascript
复制
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
 
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

该方法可以向ExecutorCompletionService 中提交要执行的任务。 支持Callable和Runnable两种类型的任务。 如果提交的Runnable任务,则执行完后返回的结果为null。

代码语言:javascript
复制
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

从阻塞队列中获取执行完成的任务的,如果队列为空且任务没有全部完成,则阻塞当前线程,直到有任务执行完成。

代码语言:javascript
复制
    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}

ExecutorCompletionService支持非阻塞方式从阻塞队列中获取已完成的任务

  1. 可以通过poll方法来从阻塞队列中获取任务,如果队列为空,则直接返回null,不会阻塞当前线程。 2.支持等待多长时间来从阻塞队列中获取已经完成的任务。

总结

ExecutorCompletionService的实现原理是内部使用了FutureTask来实现异步的任务执行。通过一个内部类继承FutureTask,并实现了FutureTask的一个done方法。该done方法会在任务执行完成之后调用该方法,在任务执行完之后把当前的FutureTask放入到阻塞队列中。这样就实现了先执行完成的任务先存放到阻塞队列中,应用程序可以从阻塞队列中提前获取先执行完的任务。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.07.18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概要
    • 示例
    • ExecutorCompletionService 源码分析
      • newTaskFor 方法
        • ExecutorCompletionService 构造方法1
          • ExecutorCompletionService 构造方法2
            • submit方法
            • 总结
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档