专栏首页每天学JavaJava源码之ThreadPoolExecutor

Java源码之ThreadPoolExecutor

ThreadPoolExecutor类是线程池的基础,线程池的目的是为了减少了每个任务调用的开销,在拥有大量异步任务时可以增强的性能,并且还可以提供绑定和管理资源的方法

我在前面的一篇文章中说到了Executors这个类提供四种创建线程池的方法,但是其实质还是通过ThreadPoolExecutor来创建线程池。在阿里的开发手册中提到过,推荐使用ThreadPoolExecutor创建线程池,原因是了解线程池的创建,才能避免资源浪费。

01

构造器

ThreadPoolExecutor继承AbstractExecutorService抽象类,AbstractExecutorService提供了ExecutorService执行方法的默认实现。

public class ThreadPoolExecutor extends AbstractExecutorService 

ThreadPoolExecutor有四个构造器(实际上都是调用第四个构造方法)

第一个:这个构造方法提供默认的工厂方法和默认拒绝策略(AbortPolicy),也就是说我们只需要传核心线程数,最大线程数,非核心线程的超时时间,超时时间单位,任务队列

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

第二个:相对于第一种,这个构造方法多传了一个线程工厂方法

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

第三个:相对于第一种,这个构造方法多传递了拒绝策略

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

第四个:这是线程池的核心构造器,上面三个构造器核心仍是调用它。我们可以看一下代码:

一.如果核心线程数小于0,或者最大线程数小于0,或者最大线程数小于核心线程数,或者非核心线程存活时间小于0则抛出参数不合法的异常。

二.如果工作队列为null,或者线程方法为null,或者拒绝策略为null抛出空指针异常

三.this.acc不知道大家是否清楚(可以了解一下Java安全模型):System.getSecurityManager()返回一个安全管理器对象security,如果security不为null,ccessController.getContext()就比较快照上下文信息与本上下文信息,然后来做出对受控资源访问控制的决策。

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

上面就是线程池的四个构造器。通过这个四个构造器我们就可以创建线程池了。

扩展:

四种拒绝策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

队列有如下选择:

ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous

02

执行方法

如果我们通过第一步创建了一个线程池,那么下一步我们就是要使用线程池的线程去执行任务了,ThreadPoolExecutor提供了方法:execute(Runnable command),我们将任务传进来就可以了,我们看一下源码:源码中注释也有说明。

如果任务为null则抛出空指针异常

如果允许的线程小于核心线程,我们开启一个新的线程去执行第一个任务。使用addWorker方法进行检测允许状态和工作线程的数量,如果没有返回false我们就增加线程去处理任务。

如果一个任务成功的进入队列,在添加一个线城时仍需要进行双重检查(因为前一次检测后线程已经消亡)或者线程池shutdown了当我们进入这个方法的时候。所以我们需要再次检测状态,如果有需要,回滚队列的操作在停止的时候,或者当线程池没有线程时需要创建一个新线程。

如果无法入队列,那么需要增加一个新线程,如果此操作失败,那么就意味着线程池已经shutdown或者已经饱和了,执行拒绝策略

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

关于ctl.get()

//利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态: private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

到这里我们会发现execute方法实质是将任务放入到addWorker中,那么addWorker是怎么样的逻辑呢?

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //死循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //如果当前线程池的状态时SHUTDOWN,STOP,TIDYING,TERMINATED并且为SHUTDOWN状态时任务队列为空,那么就返回false  原因:如果调用了shutdown方法,此时的线程池还会继续工作并且会在任务队列中的所有任务执行完成后才会结束线程池。
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //死循环
            for (;;) {
                int wc = workerCountOf(c);
                //core是在execute方法中传的参数,true表示 核心线程,false表示最大线程 
                 //CAPACITY  可以理解为Integer的最大值  1左移29位再-1
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //如果增加任务数量成功那么退出这个循环执行下面的代码,否则继续    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 初始化worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //同步块 使用内置锁 锁住
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                      //再判断一次当前线程池的状态  避免在执行过程中线程时被使用者关闭
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //向正在执行的任务队列(workers)中添加work   
                        // 区别一下:workqueue是等待执行的阻塞队列
                        workers.add(w);
                        int s = workers.size();
                        //记录曾经并发执行的最大任务个数
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                            //添加任务成功
                        workerAdded = true;
                    }
                } finally {
                    //finally块释放内置锁    
                    mainLock.unlock();
                }
                //如果任务添加成功那么开始执行任务
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWoker方法中我们会发现Worker这个类将传入的任务启动。而Worker是ThreadPoolExector内部类,它继承了AQS抽象类,其重写了AQS的一些方法,并且其也可作为一个Runnable对象,从而可以创建线程Thread。

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

在它的构造器中:我们发现通过线程工厂方法获取到线程

 Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

除此之外由于worker重写了run方法所以我们在addWorker方法中使用t.start实际上是执行了runWorker方法

      public void run() {
            runWorker(this);
        }

runWorker方法中首先取到我们提交的任务然后while循环获取task(每次任务执行完之后我们会将task设为null,然后调用getTask方法)。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //得到worker对象中我们提交的任务
        Runnable task = w.firstTask;

        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果当前任务为空  那么就从getTask中获得任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //任务执行前调用的方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任务结束后调用的方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask():我们会发现这个getTask方法是一个死循环,这里面有一串代码:

Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

timed取决与allowCoreThreadTimeOut也就是是否允许销毁核心线程池以及线程数是否大于核心线程数。

如果为true那么取任务,如果false 那就调用take方法,一直阻塞队列等待任务添加

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

到这里关于线程池如何构建出来以及执行线程的过程。

总结:

ThreadPoolExecutor是创建线程池的基础,其核心构造器需要我们传入参数有核心线程数,最大线程数,非核心线程数的存活时间,存活时间的单位,任务队列,线程工厂方法,拒绝策略。

当我们传入正确的参数后就创建了一个线程池,此时我们可以通过execute方法执行一个线程任务。在execute方法中最主要的是addWorker这个方法,此方法传入任务类和是否核心线程处理,addWorker方法线程的start方法是执行Worker这个重写run方法的内部类调用的runWorker方法(也就是runWorker是执行run方法的地方)。getTask是保证线程池中核心线程不被销毁的方法

本文分享自微信公众号 - 每天学Java(gh_fddfb9d03324),作者:每天学Java

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-11-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java语言中的线程安全

    “ 在深入理解Java虚拟机一书的高效并发部分中提到:按照线程安全的安全程度由强至弱来排序,可以将Java语言中各种操作共享数据分为5类:不可变,绝对线程安全,...

    每天学Java
  • Spring Boot 执行定时任务

    “ Spring Boot中可以使用注解实现定时任务,十分方便。今天的文章我们首先讲一下个人的项目,然后在文章后面我们将定时任务与线程池结合起来实现每天的个人支...

    每天学Java
  • 浅聊线程中断

    “ 在前面分析Condition的时候,被阻塞的线程在我关闭应用的时候,会抛出异常,这是因为阻塞的线程被其他线程中断了。其实在学习AQS的时候我们也说过线程中断...

    每天学Java
  • 面经手册 · 第21篇《手写线程池,对照学习ThreadPoolExecutor线程池实现原理!》

    正好是2020年,看到这张图还是蛮有意思的。以前小时候总会看到一些科技电影,讲到机器人会怎样怎样,但没想到人似乎被娱乐化的东西,搞成了低头族、大肚子!

    小傅哥
  • 通过ThreadPoolExecutor源码分析线程池实现原理

    线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。使用线程池可以重复利用已创建的线程降低线程创建和销毁带来的消耗,随之即可提高响应速度...

    GreizLiao
  • Java的重入锁ReentrantLock

    ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁,支持重入性,表示能够对共享资源能够重复加锁,即当前线程获取该...

    用户3467126
  • 源码分析—ThreadPoolExecutor线程池三大问题及改进方案

    在一次聚会中,我和一个腾讯大佬聊起了池化技术,提及到java的线程池实现问题,我说这个我懂啊,然后巴拉巴拉说了一大堆,然后腾讯大佬问我说,那你知道线程池有什么缺...

    luozhiyun
  • 线程池-线程池源码详解

    在ThreadPoolExecutor的属性定义中频繁地用位移运算来表示线程池状态,位移运算是改变当前值的一种高效手段,包括左移和右移。下面从属性定义开始阅读T...

    DougWang
  • JAVA线程之ThreadLocal与栈封闭(六)

    PS:这次说了线程封闭的概念,其实很容易理解只要知道在ThreadLocal是JVM内部维护了一个Map就可以了。栈封闭没有纤细概述,跟局部变量是一个概念。

    IT故事会
  • 【原创】Java并发编程系列16 | 公平锁与非公平锁

    上一篇提到重入锁 ReentrantLock 支持两种锁,公平锁与非公平锁。那么这篇文章就来介绍一下公平锁与非公平锁。

    java进阶架构师

扫码关注云+社区

领取腾讯云代金券