前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java线程池实现原理

Java线程池实现原理

作者头像
luoxn28
发布2020-11-12 10:51:23
5360
发布2020-11-12 10:51:23
举报
文章被收录于专栏:TopCoderTopCoder

Java中的线程池是运用场景最多的并发组件,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来至少以下几个好处:降低资源消耗、提高响应速度、提高线程可管理性和异步代码解耦等。

当提交一个新任务到线程池时,线程池的处理流程如下:

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤也需要获取全局锁)。
  4. 如果创建新线程将使当前运行的线程数超出maximumPoolSize,该任务将被拒绝,并调用相应的拒绝策略来处理(RejectedExecutionHandler.rejectedExecution()方法,线程池默认的饱和策略是AbortPolicy,也就是抛异常)

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

线程池任务 拒绝策略包括抛异常直接丢弃丢弃队列中最老的任务将任务分发给调用线程处理

线程池的实现主要包括2部分,一个是线程管理(这里的线程管理只包括线程计数、线程信息存储等,不包括线程的阻塞/唤醒),另一个是阻塞队列(包括线程的排队/阻塞/唤醒)。

线程池使用示例如下:

代码语言:javascript
复制
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,
        60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> System.out.println("hello world"));

public void execute(Runnable command) {
    /*
     * 1. 运行的线程少于corePoolSize,创建新线程,注意,在addWorker中对mainLock进行lock/unlock操作
     * 2. 成功加入任务后,判断是否需要增加一个线程
     * 3. 如果添加任务失败,尝试创建新线程,如果超过了maxPoolSize,(根据拒绝策略)拒绝任务
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

任务是包裹在Worker对象中的,Worker对象的run方法主要逻辑是:第一次执行任务firstTask,以后都从调用getTask从阻塞队列中获取任务来执行。也就是说,任务线程第一次启动执行任务,不是从阻塞队列中获取的,而是直接执行任务的

注意,一个Worker对象对应一个线程(Thread),新的Worker可视为新线程,Worker继承了Runable和AQS,继承Runable这个好理解,毕竟是任务,那么为什么要继承AQS呢?从javadoc的引用中可以看出:

我们实现了一个简单的非重入互斥锁而不是使用ReentrantLock,因为我们不希望工作任务在调用setCorePoolSize等池控制方法时能够重新获取锁

这句话怎么理解呢?如果是ReentrantLock,同一个线程在第二次lock/tryLock是返回true的,那么可能会发生这种场景:

代码语言:javascript
复制
-- Worker线程 
-> 调用了setCorePoolSize减少coreSize 
-> 对多余线程进行interruptIdleWorkers 
-> 对将要interruptId的线程进行tryLock()(这里成功!)
->  Thread.interrupt(该Worker线程自己)

那么问题来了,该Workder线程自己把自己中断了!!!具体可以看下interruptIdleWorkers方法代码,这里不再赘述。

接下来看下Worker 线程执行方法:

代码语言:javascript
复制
// class Worker extends AbstractQueuedSynchronizer implements Runnable
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // firstTask为创建线程第一次加入进来的任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); // 前置回调方法
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 后置回调方法
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

调用getTask时,调用workQueue.poll(timeout)或者workQueue.take(),从这里看出,线程的阻塞唤醒操作是由workQueue(阻塞队列)来做的,这里的线程阻塞唤醒实现原理请参考对应资料,这里不再具体展开。

代码语言:javascript
复制
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int wc = workerCountOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

至此,我们已经知道了线程池的整个执行流程,那么最后一起回顾下:

线程池的实现主要包括2部分,一个是线程管理(这里的线程管理只包括线程计数、线程信息存储等,不包括线程的阻塞/唤醒),另一个是阻塞队列(包括线程的排队/阻塞/唤醒)。 线程池的任务是包裹在Worker对象中的,Worker对象的run方法主要逻辑是:第一次执行任务firstTask,以后都从调用getTask从阻塞队列中获取任务来执行。也就是说,任务线程第一次启动执行任务,不是从阻塞队列中获取的,而是直接执行任务的。后续线程的等待唤醒都是基于阻塞队列来的。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TopCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档