前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ThreadPoolExecutor 几个疑惑与解答

ThreadPoolExecutor 几个疑惑与解答

作者头像
用户3148308
发布2019-05-06 17:02:57
2790
发布2019-05-06 17:02:57
举报
文章被收录于专栏:xiaoheikexiaoheike
  1. 任务是否都要先放入队列? 当工作线程数小于核心线程数时,任务是不会经过队列,而是直接创建 Worker 时传入。但是如果工作线程数已经大于核心线程数,则任务是要先放入队列的。实际上只要是被创建的工作线程所执行都是不需要经过工作队列的,而是在创建新工作线程时作为参数传入处理。对应就是调用 addWorker 方法的地方。
代码语言:javascript
复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 工作线程数<核心线程数,创建核心线程并直接执行该任务
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 工作线程数大于核心线程数,小于最大允许线程数,创建线程并执行该任务
    else if (!addWorker(command, false))
        reject(command);
}
  1. 什么时候创建额外的线程 队列已经满了,并且当前工作线程数小于最大允许线程数才会创建额外的线程。实际上所有调用 addWorker 方法的地方都会经过该判断。
代码语言:javascript
复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 队列队列已经满了,才会执行 addWorker 逻辑
    else if (!addWorker(command, false))
        reject(command);
}


// addWorker 中的部分代码
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        // 增加新的线程数
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get(); // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}
  1. 怎么销毁多余线程 当队列中没有任务之后,执行的线程将会被销毁。
代码语言:javascript
复制
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // getTask() 返回为null,也就是队列中没有任务了
            while (task != null || (task = getTask()) != null) {
                // 省略代码
            }
            completedAbruptly = false;
        } finally {
            // 移除worker
            processWorkerExit(w, completedAbruptly);
        }
    }
  1. 如何实现让多余线程在指定时间后销毁? 超过核心线程数的线程,获取队列任务会使用 poll 方法,增加阻塞时间,如果在指定的时间没有任务到达,就会返回null,从而销毁该线程
代码语言:javascript
复制
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // poll 方法设置时间,也就是获取任务增加阻塞时间
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  1. Worker 继承AQS的作用 继承了AQS类,可以方便的实现工作线程的中止操作;
  2. 可以设置的最大线程数 2^29 次方,因为ctl高三位被用于表示当前线程池的状态了,所以只有29位用于表示最大线程池的大小。
代码语言:javascript
复制
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;
  1. 拒绝策略什么时候起作用? 工作线程数已经达到 maximumPoolSize,并且队列已经满了,则会启用拒绝策略
代码语言:javascript
复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 工作线程数<核心线程数,创建核心线程并直接执行该任务
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        // 工作线程数已经达到达到设置值,队列也已经满,则拒绝
        reject(command);
}

欢迎转载,但请注明本文链接,谢谢你。 2019.04.21 17:47

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-04-21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档