JDK7 ThreadPoolExecutor execute(Runnable command) 方法解析

/**
     * 通过这个方法提交的线程,将在新的线程,或者已有的(线程池)线程中执行
     * 
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get(); //c是线程池运行状态值和线程数目|的值,通过c既可以取得线程数目也可以取得线程池运行状态。
        if (workerCountOf(c) < corePoolSize) {//如果当前线程数量小于corePoolSize
            if (addWorker(command, true)) //调用addWorker 第二个参数传true,以corePoolSize作为线程池边界,创建一个新线程
                return;
            c = ctl.get();
        }
	//否则,加入等待队列
        if (isRunning(c) && workQueue.offer(command)) {//如果把任务放入处理队列成功
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//线程池不在运行,从队列中删除任务,执行拒绝策略
                reject(command);
            else if (workerCountOf(recheck) == 0)//若果线程池为空,创建一个空任务线程,去执行等待队列中的任务
                addWorker(null, false);
        }
	//如果添加失败,第二个参数传false,以maximumPoolSize作为线程池边界添加线程
        else if (!addWorker(command, false))//,如果添加失败,执行拒绝策略
            reject(command);
    }

//具体看下addWorker
     /*
     * Methods for creating, running and cleaning up after workers
     */

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread#start), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//可以添加,先把线程数目加1
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);//给任务创建新线程
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//把新线程加入线程池
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//开始用新线程执行任务。
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)//如果添加执行失败,把线程从线程池删除。
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    //接着看下Worker类, 它是一个私有内部类
        private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
	///两个私有变量 具体执行任务的线程thread,由线程工厂创建和要执行的任务firstTask
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {//开始执行任务,重点看下面执行方法,它是父类的runWorker方法
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

     ///worker 执行方法调用
      final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//如果创建线程时,没指定具体任务,就调用getTask()从等待队列中取任务。具体看下面getTask()方法。
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//任务执行前调用,子类重写可做监控
                    Throwable thrown = null;
                    try {
                        task.run();//具体任务的方法 
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//任务执行后调用,子类重写可做监控
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//清除执行完任务的线程,补充一个新的空线程(如果需要)
        }
    }

      /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果线程池有存活超时设置,或者,线程数目超过了corePoolSize,就设置true

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
	    //从队列尾端取一个任务。是超时等待,还是阻塞等待,前面定了策略。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏不会写文章的程序员不是好厨师

利用LockSupport实现简单Future

上篇文章已经讲到了LockSupport提供的功能,以及如何使用LockSupport实现锁的语义,本文将介绍Future的语义以及如何利用LockSuppor...

9730
来自专栏编码小白

ofbiz实体引擎(一) 获取Delegator

public abstract class DelegatorFactory implements Factory<Delegator, String> { ...

39250
来自专栏会跳舞的机器人

java并发编程的艺术第十章——Executor框架

Executor框架的主要成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable...

12420
来自专栏Jack-Cui

225. Implement Stack using Queues(Stack-Easy)

Implement the following operations of a stack using queues. push(x) – Push eleme...

205100
来自专栏Java编程技术

FutureTask 原理

如上代码主线程会在futureTask.get()出阻塞直到task任务执行完毕,并且会返回结果。

10410
来自专栏猿天地

面试-线程池的成长之路

线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理。

17220
来自专栏小樱的经验随笔

HDU 1232 畅通工程

畅通工程 Time Limit: 4000/2000 MS (Java/Others)    Memory Limit: 65536/32768 K (Java...

328100
来自专栏向治洪

java线程池ThreadPoolExecutor 如何与 AsyncTask() 组合使用

简单说下Executors类,提供的一系列创建线程池的方法: 他们都有两个构造方法 1. --------newFixedThreadPool (创建一个定长线...

20380
来自专栏向治洪

ScheduledThreadPoolExecutor原理探究

简介 ThreadPoolExecutor是Executors中一部分功能,下面来介绍另外一部分功能也就是ScheduledThreadPoolExecutor...

334100
来自专栏Phoenix的Android之旅

关于ThreadPoolExecutor要注意的问题

之前我们说过关于线程池的问题,我们可以用Executors的各种方法来获取不同的ThreadPoolExecutor来满足需求。但是当我们需要自定义线程池的时候...

8830

扫码关注云+社区

领取腾讯云代金券