前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深度解读 java 线程池设计思想及源码实现

深度解读 java 线程池设计思想及源码实现

作者头像
Java技术江湖
发布2019-09-25 14:09:06
6400
发布2019-09-25 14:09:06
举报
文章被收录于专栏:微信公众号【Java技术江湖】

转自:https://javadoop.com/2017/09/05/java-thread-pool

我相信大家都看过很多的关于线程池的文章,基本上也是面试必问的,好像我写这篇文章其实是没有什么意义的,不过,我相信你也和我一样,看了很多文章还是一知半解,甚至可能看了很多瞎说的文章。希望大家看过这篇文章以后,就可以完全掌握 Java 线程池了。

我发现好些人都是因为这篇文章来到本站的,希望这篇让人留下第一眼印象的文章能给你带来收获。

本文一大重点是源码解析,不过线程池设计思想以及作者实现过程中的一些巧妙用法是我想传达给读者的。本文还是会一行行关键代码进行分析,目的是为了让那些自己看源码不是很理解的同学可以得到参考。

线程池是非常重要的工具,如果你要成为一个好的工程师,还是得比较好地掌握这个知识。即使你为了谋生,也要知道,这基本上是面试必问的题目,而且面试官很容易从被面试者的回答中捕捉到被面试者的技术水平。

本文略长,建议在 pc 上阅读,边看文章边翻源码(Java7 和 Java8 都一样),建议想好好看的读者抽出至少 15 至 30 分钟的整块时间来阅读。当然,如果读者仅为面试准备,可以直接滑到最后的总结部分。

目录

  • 总览
  • Executor 接口
  • ExecutorService
  • FutureTask
  • AbstractExecutorService
  • ThreadPoolExecutor
  • Executors
  • 总结

总览

开篇来一些废话。下图是 java 线程池几个相关类的继承结构:

先简单说说这个继承结构,Executor 位于最顶层,也是最简单的,就一个 execute(Runnable runnable) 接口方法定义。

ExecutorService 也是接口,在 Executor 接口的基础上添加了很多的接口方法,所以一般来说我们会使用这个接口。

然后再下来一层是 AbstractExecutorService,从名字我们就知道,这是抽象类,这里实现了非常有用的一些方法供子类直接使用,之后我们再细说。

然后才到我们的重点部分 ThreadPoolExecutor 类,这个类提供了关于线程池所需的非常丰富的功能。

另外,我们还涉及到下图中的这些类:

同在并发包中的 Executors 类,类名中带字母 s,我们猜到这个是工具类,里面的方法都是静态方法,如以下我们最常用的用于生成 ThreadPoolExecutor 的实例的一些方法:

代码语言:javascript
复制
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

另外,由于线程池支持获取线程执行的结果,所以,引入了 Future 接口,RunnableFuture 继承自此接口,然后我们最需要关心的就是它的实现类 FutureTask。到这里,记住这个概念,在线程池的使用过程中,我们是往线程池提交任务(task),使用过线程池的都知道,我们提交的每个任务是实现了 Runnable 接口的,其实就是先将 Runnable 的任务包装成 FutureTask,然后再提交到线程池。这样,读者才能比较容易记住 FutureTask 这个类名:它首先是一个任务(Task),然后具有 Future 接口的语义,即可以在将来(Future)得到执行的结果。

当然,线程池中的 BlockingQueue 也是非常重要的概念,如果线程数达到 corePoolSize,我们的每个任务会提交到等待队列中,等待线程池中的线程来取任务并执行。这里的 BlockingQueue 通常我们使用其实现类 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每个实现类都有不同的特征,使用场景之后会慢慢分析。想要详细了解各个 BlockingQueue 的读者,可以参考我的前面的一篇对 BlockingQueue 的各个实现类进行详细分析的文章。

把事情说完整:除了上面说的这些类外,还有一个很重要的类,就是定时任务实现类 ScheduledThreadPoolExecutor,它继承自本文要重点讲解的 ThreadPoolExecutor,用于实现定时执行。不过本文不会介绍它的实现,我相信读者看完本文后可以比较容易地看懂它的源码。

以上就是本文要介绍的知识,废话不多说,开始进入正文。

Executor 接口

代码语言:javascript
复制
/* 
 * @since 1.5
 * @author Doug Lea
 */
public interface Executor {
    void execute(Runnable command);
}

我们可以看到 Executor 接口非常简单,就一个 voidexecute(Runnablecommand) 方法,代表提交一个任务。为了让大家理解 java 线程池的整个设计方案,我会按照 Doug Lea 的设计思路来多说一些相关的东西。

我们经常这样启动一个线程:

代码语言:javascript
复制
new Thread(new Runnable(){
  // do something
}).start();

用了线程池 Executor 后就可以像下面这么使用:

代码语言:javascript
复制
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

如果我们希望线程池同步执行每一个任务,我们可以这么实现这个接口:

代码语言:javascript
复制
class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();// 这里不是用的new Thread(r).start(),也就是说没有启动任何一个新的线程。
    }
}

我们希望每个任务提交进来后,直接启动一个新的线程来执行这个任务,我们可以这么实现:

代码语言:javascript
复制
class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();  // 每个任务都用一个新的线程来执行
    }
}

我们再来看下怎么组合两个 Executor 来使用,下面这个实现是将所有的任务都加到一个 queue 中,然后从 queue 中取任务,交给真正的执行器执行,这里采用 synchronized 进行并发控制:

代码语言:javascript
复制
class SerialExecutor implements Executor {
    // 任务队列
    final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
    // 这个才是真正的执行器
    final Executor executor;
    // 当前正在执行的任务
    Runnable active;

    // 初始化的时候,指定执行器
    SerialExecutor(Executor executor) {
        this.executor = executor;
    }

    // 添加任务到线程池: 将任务添加到任务队列,scheduleNext 触发执行器去任务队列取任务
    public synchronized void execute(final Runnable r) {
        tasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (active == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((active = tasks.poll()) != null) {
            // 具体的执行转给真正的执行器 executor
            executor.execute(active);
        }
    }
}

当然了,Executor 这个接口只有提交任务的功能,太简单了,我们想要更丰富的功能,比如我们想知道执行结果、我们想知道当前线程池有多少个线程活着、已经完成了多少任务等等,这些都是这个接口的不足的地方。接下来我们要介绍的是继承自 Executor 接口的 ExecutorService 接口,这个接口提供了比较丰富的功能,也是我们最常使用到的接口。

ExecutorService

一般我们定义一个线程池的时候,往往都是使用这个接口:

代码语言:javascript
复制
ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);

因为这个接口中定义的一系列方法大部分情况下已经可以满足我们的需要了。

那么我们简单初略地来看一下这个接口中都有哪些方法:

代码语言:javascript
复制
public interface ExecutorService extends Executor {

    // 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
    void shutdown();

    // 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
    // 它和前面的方法相比,加了一个单词“now”,区别在于它会去停止当前正在进行的任务
    List<Runnable> shutdownNow();

    // 线程池是否已关闭
    boolean isShutdown();

    // 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true
    // 这个方法必须在调用shutdown或shutdownNow方法之后调用才会返回true
    boolean isTerminated();

    // 等待所有任务完成,并设置超时时间
    // 我们这么理解,实际应用中是,先调用 shutdown 或 shutdownNow,
    // 然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时
    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;

    // 提交一个 Callable 任务
    <T> Future<T> submit(Callable<T> task);

    // 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值,
    // 因为 Runnable 的 run 方法本身并不返回任何东西
    <T> Future<T> submit(Runnable task, T result);

    // 提交一个 Runnable 任务
    Future<?> submit(Runnable task);

    // 执行所有任务,返回 Future 类型的一个 list
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;

    // 也是执行所有任务,但是这里设置了超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
            throws InterruptedException;

    // 只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;

    // 同上一个方法,只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果,
    // 不过这个带超时,超过指定的时间,抛出 TimeoutException 异常
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

这些方法都很好理解,一个简单的线程池主要就是这些功能,能提交任务,能获取结果,能关闭线程池,这也是为什么我们经常用这个接口的原因。

FutureTask

在继续往下层介绍 ExecutorService 的实现类之前,我们先来说说相关的类 FutureTask。

代码语言:javascript
复制
Future   -> RunnableFuture -> FutureTask
Runnable -> RunnableFuture

FutureTask 通过 RunnableFuture 间接实现了 Runnable 接口,
所以每个 Runnable 通常都先包装成 FutureTask,
然后调用 executor.execute(Runnable command) 将其提交给线程池

我们知道,Runnable 的 void run() 方法是没有返回值的,所以,通常,如果我们需要的话,会在 submit 中指定第二个参数作为返回值:

代码语言:javascript
复制
<T> Future<T> submit(Runnable task, T result);

其实到时候会通过这两个参数,将其包装成 Callable。

Callable 也是因为线程池的需要,所以才有了这个接口。它和 Runnable 的区别在于 run() 没有返回值,而 Callable 的 call() 方法有返回值,同时,如果运行出现异常,call() 方法会抛出异常。

代码语言:javascript
复制
public interface Callable<V> {

    V call() throws Exception;
}

在这里,就不展开说 FutureTask 类了,因为本文篇幅本来就够大了,这里我们需要知道怎么用就行了。

下面,我们来看看 ExecutorService 的抽象实现 AbstractExecutorService

AbstractExecutorService

AbstractExecutorService 抽象类派生自 ExecutorService 接口,然后在其基础上实现了几个实用的方法,这些方法提供给子类进行调用。

这个抽象类实现了 invokeAny 方法和 invokeAll 方法,这里的两个 newTaskFor 方法也比较有用,用于将任务包装成 FutureTask。定义于最上层接口 Executor中的 voidexecute(Runnablecommand) 由于不需要获取结果,不会进行 FutureTask 的包装。

需要获取结果(FutureTask),用 submit 方法,不需要获取结果,可以用 execute 方法。

下面,我将一行一行源码地来分析这个类,跟着源码来看看其实现吧:

Tips: invokeAny 和 invokeAll 方法占了这整个类的绝大多数篇幅,读者可以选择适当跳过,因为它们可能在你的实践中使用的频次比较低,而且它们不带有承前启后的作用,不用担心会漏掉什么导致看不懂后面的代码。

代码语言:javascript
复制
public abstract class AbstractExecutorService implements ExecutorService {

    // RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
    // 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    // 提交任务
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 1\. 将任务包装成 FutureTask
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 2\. 交给执行器执行,execute 方法由具体的子类来实现
        // 前面也说了,FutureTask 间接实现了Runnable 接口。
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        // 1\. 将任务包装成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task, result);
        // 2\. 交给执行器执行
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 1\. 将任务包装成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task);
        // 2\. 交给执行器执行
        execute(ftask);
        return ftask;
    }

    // 此方法目的:将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了
    // 第二个参数 timed 代表是否设置超时机制,超时时间为第三个参数,
    // 如果 timed 为 true,同时超时了还没有一个线程返回结果,那么抛出 TimeoutException 异常
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        // 任务数
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        // 
        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);

        // ExecutorCompletionService 不是一个真正的执行器,参数 this 才是真正的执行器
        // 它对执行器进行了包装,每个任务结束后,将结果保存到内部的一个 completionQueue 队列中
        // 这也是为什么这个类的名字里面有个 Completion 的原因吧。
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
        try {
            // 用于保存异常信息,此方法如果没有得到任何有效的结果,那么我们可以抛出最后得到的一个异常
            ExecutionException ee = null;
            long lastTime = timed ? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // 首先先提交一个任务,后面的任务到下面的 for 循环一个个提交
            futures.add(ecs.submit(it.next()));
            // 提交了一个任务,所以任务数量减 1
            --ntasks;
            // 正在执行的任务数(提交的时候 +1,任务结束的时候 -1)
            int active = 1;

            for (;;) {
                // ecs 上面说了,其内部有一个 completionQueue 用于保存执行完成的结果
                // BlockingQueue 的 poll 方法不阻塞,返回 null 代表队列为空
                Future<T> f = ecs.poll();
                // 为 null,说明刚刚提交的第一个线程还没有执行完成
                // 在前面先提交一个任务,加上这里做一次检查,也是为了提高性能
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    // 这里是 else if,不是 if。这里说明,没有任务了,同时 active 为 0 说明
                    // 任务都执行完成了。其实我也没理解为什么这里做一次 break?
                    // 因为我认为 active 为 0 的情况,必然从下面的 f.get() 返回了

                    // 2018-02-23 感谢读者 newmicro 的 comment,
                    //  这里的 active == 0,说明所有的任务都执行失败,那么这里是 for 循环出口
                    else if (active == 0)
                        break;
                    // 这里也是 else if。这里说的是,没有任务了,但是设置了超时时间,这里检测是否超时
                    else if (timed) {
                        // 带等待的 poll 方法
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        // 如果已经超时,抛出 TimeoutException 异常,这整个方法就结束了
                        if (f == null)
                            throw new TimeoutException();
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    // 这里是 else。说明,没有任务需要提交,但是池中的任务没有完成,还没有超时(如果设置了超时)
                    // take() 方法会阻塞,直到有元素返回,说明有任务结束了
                    else
                        f = ecs.take();
                }
                /*
                 * 我感觉上面这一段并不是很好理解,这里简单说下。
                 * 1\. 首先,这在一个 for 循环中,我们设想每一个任务都没那么快结束,
                 *     那么,每一次都会进到第一个分支,进行提交任务,直到将所有的任务都提交了
                 * 2\. 任务都提交完成后,如果设置了超时,那么 for 循环其实进入了“一直检测是否超时”
                       这件事情上
                 * 3\. 如果没有设置超时机制,那么不必要检测超时,那就会阻塞在 ecs.take() 方法上,
                       等待获取第一个执行结果
                 * 4\. 如果所有的任务都执行失败,也就是说 future 都返回了,
                       但是 f.get() 抛出异常,那么从 active == 0 分支出去(感谢 newmicro 提出)
                         // 当然,这个需要看下面的 if 分支。
                 */

                // 有任务结束了
                if (f != null) {
                    --active;
                    try {
                        // 返回执行结果,如果有异常,都包装成 ExecutionException
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }// 注意看 for 循环的范围,一直到这里

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            // 方法退出之前,取消其他的任务
            for (Future<T> f : futures)
                f.cancel(true);
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

    // 执行所有的任务,返回任务结果。
    // 先不要看这个方法,我们先想想,其实我们自己提交任务到线程池,也是想要线程池执行所有的任务
    // 只不过,我们是每次 submit 一个任务,这里以一个集合作为参数提交
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            // 这个很简单
            for (Callable<T> t : tasks) {
                // 包装成 FutureTask
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                // 提交任务
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try {
                        // 这是一个阻塞方法,直到获取到值,或抛出了异常
                        // 这里有个小细节,其实 get 方法签名上是会抛出 InterruptedException 的
                        // 可是这里没有进行处理,而是抛给外层去了。此异常发生于还没执行完的任务被取消了
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            // 这个方法返回,不像其他的场景,返回 List<Future>,其实执行结果还没出来
            // 这个方法返回是真正的返回,任务都结束了
            return futures;
        } finally {
            // 为什么要这个?就是上面说的有异常的情况
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

    // 带超时的 invokeAll,我们找不同吧
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            long lastTime = System.nanoTime();

            Iterator<Future<T>> it = futures.iterator();
            // 提交一个任务,检测一次是否超时
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                // 超时
                if (nanos <= 0)
                    return futures;
            }

            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        // 调用带超时的 get 方法,这里的参数 nanos 是剩余的时间,
                        // 因为上面其实已经用掉了一些时间了
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

}

到这里,我们发现,这个抽象类包装了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它们都没有真正开启线程来执行任务,它们都只是在方法内部调用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法还没出现,需要等具体执行器来实现这个最重要的部分,这里我们要说的就是 ThreadPoolExecutor 类了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-02-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java技术江湖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 总览
  • Executor 接口
  • ExecutorService
  • FutureTask
  • AbstractExecutorService
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档