Android线程池的详细说明(二)

下面我们来看一看它的工作原理。

线程池的主要状态由一个AtomicInteger变量ctl控制,里面存放了两个概念变量:

  • workerCount 表示有效线程的数量。
  • runState 表示线程池是否在运行或是正在关闭等状态。

<b>workerCount:</b> 为了把这两个变量包装到同一个int中,ThreadPoolExecutor限制了workerCount最大值是2^29-1。workerCount是允许开发和未允许关闭的线程数之和。这个数字可能在短时间与实际存活的线程数不同。比如在ThreadFactory创建线程失败,或是将要退出的线程正在保存状态信息时,workerCount将为用户可见的线程池大小。比如添加线程失败,有可能还没来得及将线程的数量-1。此时取到的线程数,就是存活的线程数+1。

<b>runState :</b> runState则提供了线程池生命周期的控制。它的状态包括:

  • RUNNING: 接受新的任务,并处理排队中的任务。
  • SHUTDOWN:不再接受新的任务,但还是会处理排队的任务。
  • STOP:不再接受新的任务,不处理正在排队的任务,并且会打断正在处理的任务。
  • TIDYING:所有任务都已终止,workerCount为0,线程过渡到此状态将会调用terminate()方法。
  • TERMINATE:terminate()方法调用完成。

为了对runState有序的比较,runState的数字顺序非常重要。runState的状态是单调递增的,但不一定会到达每一个状态。它们的转变顺序是:

  • RUNNING -> SHUTDOWN : 在调用shutdown()方法时,会进行这个转变过程。这个方法也可能在finalize中调用。
  • (RUNNING or SHUTDOWN) -> STOP : 在调用shutdownNow()时,这一转变发生。
  • SHUTDOWN -> TIDYING : 当任务的等待队列和任务池均为空时,进行这一转变。
  • STOP -> TIDYING : 任务池为空时,发生这一转变。
  • TIDYING -> TERMINATED : 当terminated()被钩子方法调用完成时,进行这一转变。

在awaitTermination()中等待的线程会在TERMINATED时返回。检测从SHUTDOWN到TIDYING的转变并不简单,因为在SHUTDOWN状态时,任务队列可能在空和非空之间反反复复。有时还需要重复检查,这个会在后面进一步说明。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    /*
     * Bit field accessors that don't require unpacking ctl.
     * These depend on the bit layout and on workerCount being never negative.
     */

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

这里我们可以看到CAPACITY其实是workCount的上限,(1 << (Integer.SIZE - 3)) - 1即是一个int型,低29位置1,高位清0。

这样我们就比较容易理解 workerCountOf方法的原理,其实就是取低29位的大小,而runStateOf就是取高3位的大小。ctlOf则是两者之或。

由于ctl是一个多线程公有的数据。所以,对它的修改要格外小心。

在这里是ctl的workCount递增和递减的两个方法:

    /**
     * Attempts to CAS-increment the workerCount field of ctl.
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Attempts to CAS-decrement the workerCount field of ctl.
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

这里我们可以看一下AtomicInteger的这个方法:

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

这个方法比较简单,我们可以清晰地看到它会将workCount递减到0;


workQueue

线程的工作队列,是一个BlockingQueue<Runnable>。这个成员用于保存任务和切换任务。当workQueue.poll();为<b>null</b>时,我们无法保证此时 workQueue一定为空。所以,我们在SHUTDOWNTIDYING时只能依赖isEmpty()方法判断。这包括了特殊的队列,比如DelayQueues,会返回null,然后在delay到期后返回非null的值。


mainLock

mainLock是一个ReentrantLock,持有工作线程的设置和相关的信息。虽然我们可以使用某种并发集来完成这个工作,但事实证明用锁会更好。原因之一就是interruptIdleWorkers系列方法。它们可以避免在shutdown过程中没有必要的频繁中断。否则,退出线程时会同步中断那些还没有中断的线程。这也简化了一些关联的静态统计信息。比如largestPoolSize等等。同样,我们在shutdownshutdownNow方法中,会持有mainLock,这样可以确保线程池在分别退出时的稳定。


workers

workers是一个HashSet<Worker>线程池中所有工作线程的集合。只有当mainLock上锁时,才会被访问。


termination

awaitTermination的条件锁,会在shutdown和shutdownNow中调用。


largestPoolSize

追踪线程池达到的最大线程数。只有在mainLock.lock()后,才能访问。


completedTaskCount

已完成任务的数量。只会在工作线程终止时更新。同样,只会在上锁时被访问。


以上就是ThreadPoolExecutor的基本成员。接下来,介绍一下ThreadPoolExecutor的控制变量。由于涉及并发操作,所有的控制变量都被定义为volatiles为变量。但是不需要加锁。因为所有变量都不会有内部变量,根据他们的值,再对它们进行改变。


threadFactory

创建新线程的工厂。

/** 
 * The {@link Executors#defaultThreadFactory} method provides a more
 * useful simple implementation, that sets the created thread context
 * to known values before returning it.
 * @since 1.5
 * @author Doug Lea
 */
public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

handler

RejectedExecutionHandler类型。在执行execute方法时,如果任务队列完全饱和或已经关闭则调用。


keepAliveTime && allowCoreThreadTimeOut

keepAliveTime控制线程池的超时机制。allowCoreThreadTimeOut为true时,任务如果在等待了keepAliveTime后还没有被执行,任务就会自行终止。


defaultHandler

线程池默认的拒绝策略。


Worker

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Worker通过继承AbstractQueuedSynchronizer实现了简单的独占锁。并同时持有了任务的RunnableThread。同时,我们可以看到Worker将大量的工作,委托到外部去做,如将Runnable的.run()委托给runWorker,而自己只管理线程和锁的状态。至于Worker委托的方法,我们顺着代码,在下面讲解。


advanceRunState

    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

这个方法用于提升ctl中state。为了处理并发,在这样的小方法中,选择不加锁,而是使用CAS的方式进行同步。每次赋值前先取值,然后通过CAS的原子操作,先进行对比,如果值没有发生改变,则赋上新的值。如果失败则不断地重试。


tryTerminate

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

这个方法是线程池尝试终止的方法。在任何可能让线程池终止地方都需要调用(如减少任务数量,或是在shutdown时移除任务)。在这些地方,这个方法检测线程池各种状态,发现线程池具备了终止的条件,就终止线程池。

首先我们看第一个判断,它描述了哪些情况下,不会终止线程池。如果状态小于SHUTDOWN 或 如果状态不小于TIDYING 或状态为SHUTDOWN的同时工作队列不为空。这些情况下,线程池不会终止。

那么满足线程池终止条件的只有线程池为STOP或是线程池为SHUTDOWN且工作队列为空。

如果满足以上条件,进行第二重判断:如果工作中的任务不为0,则中止一个任务。然后退出。

也就是说,只有在线程池为STOP且没有工作中的任务或是线程池为SHUTDOWN且工作队列为空。才会真正的terminate。同上面的方法一样,用CAS进行检测,然后调用终止。


checkShutdownAccess

    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            } finally {
                mainLock.unlock();
            }
        }
    }

这个方法主要做了两个工作。第一是检查调用者有没有关闭线程的权限。即前面所述的:

    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

即是在这里进行检查。

第二个工作是对每个Worker进行检查,检查其调用者有没有中断线程的权限。

这个方法会在线程池shutdownshutdownNow时调用。


interruptWorkers

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

对所有Worker调用中断操作。这一段代码比较简单,但同样也埋藏了一些隐患。因为有权限的影响,可能调用者无权中断部分线程。也就是说,调用这个方法后,可能有部分线程依然没有被中断。


interruptIdleWorkers

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

故名思义,这个方法就是中断闲置的线程。我们仔细对比它与上个方法的不同,会发现闲置与否是通过w.tryLock()判断出来的。tryLock,我们在前面讲Worker的方法时提到过,它会判断线程的闲置状态,不再赘述。


有了上面的代码作为铺垫,我们可以开始看execute方法了。作为我们在使用线程池时最为常规的入口,我们从这里开始探索线程池是如何将上面介绍的所有方法,组合起来完成工作的。

execute

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

这里的过程比较复杂,分为几种情况:

  1. 如果当前工作的线程数,少于核心线程数则直接将任务添加到核心线程。
  2. 如果能够加入等待队列,又分为两种情况:
  • 如果线程池已经关闭,则直接拒绝任务。
  • 否则如果没有工程线程就新建一个非核心线程。
  1. 如果等待队列也无法添加,则直接添加非核心线程。
  2. 如果第3步失败,则直接拒绝。

我们可以看到excute中主要调用的就是addWorkreject两个方法。reject方法比较简单不赘述。我们看一看addWorker方法。


addWorker

addWorker的方法我们可以分为两段。第一个循环结束后才会进入第二段代码,所以我们分段来看。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

我们先来看第一个循环。这一段是没有加锁的,同步方法是通过不断循环检查状态。在完成状态检测后进入内循环。在内循环中我们可以看到对于线程数的判断逻辑。如果compareAndIncrementWorkerCount返回为true,则workerCount已经+1,通过break retry;退出循环。如果runStateOf(c) != rs表示状态出现不同步,重新循环。

这个循环完成后,addWorker会进入第二段代码:

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;

这一段代码是加锁的。先对状态进行了判断,然后将新创建的Worker对象加入到worker集合中,最后开始线程。在最后的finally中,对worker添加失败做了处理调用了addWorkerFailed

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

由于我们在添加worker之前就增加了workerCount的值,如果失败需要-1,保证workerCount的值的正确性。

接下来,我们就需要看看worker的run()到底是如何运行。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

首先通过getTask();方法获取任务。然后还是一样,检测线程池状态,如果线程池处于STOP或更后面的状态,即可中断线程。之后便是依次调用beforeExecute(wt, task),task.run(),afterExecute(task, thrown)


我们再看看getTask()方法,看看线程池是如何在池子里找到需要执行的任务的。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

前面的代码比较简单,无非取得stateworkercount。然后检查一下线程池的状态,如果已经不在工作状态,则将workercount减到0,并退出。

接下来就有一句比较重要的代码,要判断当次取任务,是超时就作罢,还是一直等待。

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

这里我们可以发现,如果我们在设置时设置了核心线程超时作罢,或者现在的线程数量超过核心线程数时会启用超时作罢。

理一下逻辑就是,超过核心线程数的线程一定会超时作罢,但是核心线程则要取决于我们的设置。

后面我们就可以看到:

Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

即是如果拥有超时机制就用poll方法,并加上超时的参数。如果没有超时机制就直接获取不会阻塞,如果没有就是null了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏拭心的安卓进阶之路

并发编程3:线程池的使用与执行流程

并发编程系列的文章酝酿好久了,但由于没有时间和毅力去写那么多练习 demo,很多文章写了一半就停止了。 在写某一系列的过程中总有其他想写的内容蹦出来,想忍住不分...

2337
来自专栏函数式编程语言及工具

ScalaPB(3): gRPC streaming

1662
来自专栏Java编程技术

Java中调度线程池ScheduledThreadPoolExecutor原理探究

前面讲解过Java中线程池ThreadPoolExecutor原理探究,ThreadPoolExecutor是Executors中一部分功能,下面来介绍另外一部...

642
来自专栏javathings

Java 中的线程池是什么 (面试必背)?

这个文章不会涉及太深的线程知识(太深我也不懂)。这里只是把线程池的一些概念整理一下,当被问到这个题目的时候,尽可能背给面试官听就行了。

1014
来自专栏Android相关

Java线程池---getTask方法解析

/** * Performs blocking or timed wait for a task, depending on * current confi...

802
来自专栏cmazxiaoma的架构师之路

通过了解RejectedExecutionException来分析ThreadPoolExecutor源码

观看本文章之前,最好看一下这篇文章熟悉下ThreadPoolExecutor基础知识。 1.关于Java多线程的一些常考知识点 2.看ThreadPoolE...

702
来自专栏Java架构

看阿里大牛深入浅出Java线程池原理分析与使用

在我们的开发中“池”的概念并不罕见,有数据库连接池、线程池、对象池、常量池等等。下面我们主要针对线程池来一步一步揭开线程池的面纱。

4674
来自专栏向治洪

ThreadPoolExecutor运行机制

最近发现几起对ThreadPoolExecutor的误用,其中包括自己,发现都是因为没有仔细看注释和内部运转机制,想当然的揣测参数导致,先看一下新建一个Thr...

1846
来自专栏移动开发的那些事儿

从源码的角度分析ThreadPoolExecutor实现原理

下面继续分析线程池如何管理运行线程,其实就一句话,维护一个线程队列,然后对这个线程队列进行存取操作

862
来自专栏林冠宏的技术文章

通俗易懂,各常用线程池的执行 流程图

corePoolSize,maximumPoolSize,workQueue之间关系。

1233

扫码关注云+社区