前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS源码分析之ThreadPoolExecutor Worker

AQS源码分析之ThreadPoolExecutor Worker

作者头像
山行AI
发布2020-03-25 09:24:56
1.6K0
发布2020-03-25 09:24:56
举报
文章被收录于专栏:山行AI

先来看下ThreadPoolExecutor#execute:

代码语言:javascript
复制
public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        // 1. 工作线程数小于corePoolSize时,添加新的worker线程        // 2. 如果达到了corePoolSize,此时一个任务如果能成功入队列(也就是说队列没有满时),需要再进一步来二次确认是否需要添加worker        // 3. 如果任务不能入队列,将尝试添加一个worker直到worker数量达到maxPoolSize        // 4. 如果线程池中线程数量 > core ,当线程空闲时间超过了keepalive时,则会销毁线程;由此可见线程池的队列如果是无界队列,那么设置线程池最大数量是无效的        int c = ctl.get();// ctl是一个AtomicInteger类型的值,它的某几位标识着不同的值,如workerCount的值和运行的状态,详情参考:https://blog.csdn.net/yjw123456/article/details/77719061        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        // 线程池是否处于RUNNING状态,以及是否能入队列        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            // 线程池是否处于RUNNING状态,从队列中移除当前command是否成功,因为在上面的判断中进行了入队操作            if (! isRunning(recheck) && remove(command))                // 线程池不是RUNNING状态时将command从队列中移除并进行reject                reject(command);            // 如果worker线程的数量为0,则添加worker            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        // 对应上面的第3种情况        else if (!addWorker(command, false))            // 如果添加失败,则执行reject            reject(command);    }

总结一下,上面的方法主要包括如下几点:

  1. 工作线程数小于corePoolSize时,添加新的worker线程;
  2. 如果达到了corePoolSize,此时一个任务如果能成功入队列(也就是说队列没有满时),需要再进一步来二次确认是否需要添加worker;
  3. 如果任务不能入队列,将尝试添加一个worker直到worker数量达到maxPoolSize;
  4. 如果线程池中线程数量 > core ,当线程空闲时间超过了keepalive时,则会销毁线程;由此可见线程池的队列如果是无界队列,那么设置线程池最大数量是无效的。

java.util.concurrent.ThreadPoolExecutor.Worker

worker的结构图:
代码部分:
代码语言:javascript
复制
private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {        /**         * This class will never be serialized, but we provide a         * serialVersionUID to suppress a javac warning.         */        private static final long serialVersionUID = 6138294804551838833L;
        /** Thread this worker is running in.  Null if factory fails. */        final Thread thread;// 对应worker的线程        /** Initial task to run.  Possibly null. */        Runnable firstTask;// 初始时要运行的task        /** Per-thread task counter */        volatile long completedTasks;// 运行完成的任务数
        /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */        Worker(Runnable firstTask) {            // 设置AQS的state值为-1            setState(-1); // inhibit interrupts until runWorker            // 初始要运行的task            this.firstTask = firstTask;            // 使用threadFactory创建一个线程            this.thread = getThreadFactory().newThread(this);        }
        /** Delegates main run loop to outer runWorker  */        public void run() {            runWorker(this);        }
        // Lock methods        //        // The value 0 represents the unlocked state.        // The value 1 represents the locked state.
        protected boolean isHeldExclusively() {// 是否处理独占状态            return getState() != 0;        }
        protected boolean tryAcquire(int unused) {// 尝试获取许可            if (compareAndSetState(0, 1)) {//cas设置state值从0到1                // 设置成功时将当前线程设置为独占线程                setExclusiveOwnerThread(Thread.currentThread());                // 获取成功                return true;            }            // 获取失败            return false;        }
        // 尝试释放占有的许可        protected boolean tryRelease(int unused) {            // 重置Worker的独占线程为null            setExclusiveOwnerThread(null);            // 状态置为0            setState(0);            return true;        }        // 调用AQS的acquire方法        public void lock()        { acquire(1); }        public boolean tryLock()  { return tryAcquire(1); }        // 调用AQS的release方法        public void unlock()      { release(1); }        public boolean isLocked() { return isHeldExclusively(); }
        void interruptIfStarted() {            Thread t;            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                }            }        }    }

tryLock的使用主要在interruptIdleWorkers方法中:

代码语言:javascript
复制
private void interruptIdleWorkers(boolean onlyOne) {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            for (Worker w : workers) {                Thread t = w.thread;                if (!t.isInterrupted() && w.tryLock()) {                    try {                        t.interrupt();                    } catch (SecurityException ignore) {                    } finally {                        w.unlock();                    }                }                if (onlyOne)                    break;            }        } finally {            mainLock.unlock();        }    }

它的主要作用是在对worker进行interrupt操作时需要先获取worker的独占锁。

而lock和unlock方法的调用:

代码语言:javascript
复制
final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                // 在每次运行一个任务之前要先对worker锁定,然后在执行完之后进行解锁                w.lock();               ---------------省略部分代码-----------------                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                -------------省略部分代码------------------                } finally {                    task = null;                    w.completedTasks++;                    // 进行解锁                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }

在每次运行一个任务之前要先对worker进行锁定,然后在执行完之后进行解锁。

另外线程池中还有一个锁:

代码语言:javascript
复制
 private final ReentrantLock mainLock = new ReentrantLock();
 /**     * Wait condition to support awaitTermination     */    private final Condition termination = mainLock.newCondition();

它的作用主要是在停止线程池时来控制用来做停止操作的线程:

代码语言:javascript
复制
 final void tryTerminate() {        for (;;) {          ----------省略部分代码----------------------            final ReentrantLock mainLock = this.mainLock;            // 加锁            mainLock.lock();            try {                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {                    try {                        terminated();                    } finally {                        ctl.set(ctlOf(TERMINATED, 0));                        // 唤醒                        termination.signalAll();                    }                    return;                }            } finally {                mainLock.unlock();            }            // else retry on failed CAS        }    }

它主要为了客户端可以调用的java.util.concurrent.ThreadPoolExecutor#awaitTermination方法服务的,awaitTermination会调用conditon的await方法进行阻塞。在这里就不进行详细地分析了,后续分析线程池源码时再具体分析。

总结

关于worker的部分我们就简要地介绍这么多。它继承AQS的主要目的是在每次运行一个任务之前要先对worker进行锁定,然后在执行完之后进行解锁,这样方便管理。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 先来看下ThreadPoolExecutor#execute:
  • java.util.concurrent.ThreadPoolExecutor.Worker
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档