前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java高并发:线程池源码解析

Java高并发:线程池源码解析

原创
作者头像
冰寒火
修改2023-03-03 22:13:17
4000
修改2023-03-03 22:13:17
举报
文章被收录于专栏:软件设计软件设计

一、基本概念

1 作用

线程池(Thread Pool)是一种基于池化思想管理线程的工具,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

2 线程池状态

线程池状态
线程池状态

3 线程池类关系

类关系
类关系

二、线程池

1 数据结构

代码语言:java
复制
public class ThreadPoolExecutor extends AbstractExecutorService {

    //ctl是原子类型,32位,高3位是线程池状态,低29位是线程数量。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    //任务队列,不受mainlock保护,需要使用一个线程安全的容器
    private final BlockingQueue<Runnable> workQueue;
	
    private final ReentrantLock mainLock = new ReentrantLock();

    //线程
    private final HashSet<Worker> workers = new HashSet<Worker>();

    private final Condition termination = mainLock.newCondition();

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     */
    private long completedTaskCount;

   
    private volatile ThreadFactory threadFactory;

    //当任务队列已满时采取的拒绝策略
    private volatile RejectedExecutionHandler handler;

    //非核心线程最大空闲时间,超过就终止
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;


    //核心线程数量,allowCoreThreadTimeOut默认为false,核心线程常驻内存
    private volatile int corePoolSize;

    //最大线程数量
    private volatile int maximumPoolSize;

    /**
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

}

2 execute

该方法继承自Execute接口,用于向线程池提交任务。

代码语言:java
复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    //1 判断线程数量是否达到核心线程数,如果未达到就添加线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) //添加线程
            return;
        c = ctl.get();
    }
	//2 判断任务队列是否已满,如果未满则入队,如果已满则尝试添加线程来处理
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //线程池此时非运行态,不接受任务,移除任务并拒绝
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) //任务队列未满,
        reject(command);//3 线程达到最大线程数,此时执行拒绝策略
}
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

添加线程需要提供该线程执行的task,并指示是添加核心线程还是普通线程,这决定了是与核心线程数比较还是与最大线程数比较。添加核心线程addWorker(command, true),添加普通线程addWorker(command, false)

代码语言:java
复制
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            //如果线程数量达到上限,或者达到核心线程数/最大线程数,则返回false,
            //由上层执行入队或者拒绝策略
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //线程数加1并跳出双层for循环,往后执行创建worker流程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //线程池状态发生改变,回到方法开头的"retry:"处往下重新执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
            	//线程池正常运行时,将新线程添加到workers中
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //此时thread还未执行start,不应该是alive状态
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;//largestPoolSize不影响线程池大小,只是记录
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //启动新线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //线程启动失败时收尾
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
//线程启动失败,需要将ctl中线程数减1,从workers中移除该线程
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

这个方法中的"retry:"是标签,类似于goto,可以无条件跳转,慎用?。 一般Java中的标签用于多层循环,可以从内层循环一次跳出所有循环。 continue retry表示跳到标签位置处,重新执行。 break retry表示跳出多层循环,然后继续往下执行,不需要回到"retry:"位置。

3 线程池关闭

3.1 shutdown

代码语言:java
复制
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //检测线程方法权限
        checkShutdownAccess();
        //将ctl修改为SHUTDOWN状态
        advanceRunState(SHUTDOWN);
        //将现有的线程中断
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //先与Worker线程抢锁,确保Worker线程没有处于某个任务执行中的状态
            //将Worker线程中断
            //worker线程每次从workQueue拿取任务后都会加锁,处理完任务再释放锁
            //所以Worker线程没抢到锁就认为它是空闲的,疑惑???
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

3.2 shutdownNow

代码语言:java
复制
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //将ctl设置为STOP状态
        advanceRunState(STOP);
        //中断所有Worker线程,不管是否处于正在执行task中
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

三、Worker

1 数据结构

工作者,封装了线程,当没有空闲核心线程且未达到最大线程数时就会创建新的worker。

Worker实现了Runnable接口,是thread执行的主任务,这个任务的主要逻辑是不断从任务队列拿task执行,并检测线程池状态,如果线程池为STOP/TIDYING状态,就调用interrupt中断自己。

代码语言:java
复制
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    }
//构造器,初始化时state=-1,禁止interrupt
Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

2 run

代码语言:java
复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
	//Worker继承了AQS
    w.unlock(); // 构造器中初始时将state=-1,防止interrupt
    boolean completedAbruptly = true;
    try {
        //不断从任务队列取任务执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //线程池状态>=STOP,不得再处理任务,包括进队的任务,此时线程主动中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();//线程主动设置中断标志
            try {
                beforeExecute(wt, task);//钩子方法,未实现
                Throwable thrown = null;
                try {
                    task.run();//执行具体任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

3 为什么继承AQS

Worker继承了AQS,实现了一个非阻塞、不可中断的锁。当外部缩小线程池核心线程数时,线程池会interrupt现有workers列表所有线程。为了安全,应该等线程处理完当前正在执行的task后才能interrupt,所以外部线程需要和Worker内线程抢锁,获取锁后再中断Worker内线程。

代码语言:java
复制
public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
	//如果核心线程数变小,则中断空闲线程
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        //如果核心线程数变大,则提前启动新线程
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

四、拒绝策略

当任务队列已满,线程数量达到最大时,对后续到达的任务采用预定的策略处理。

代码语言:java
复制
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

官方实现了四种常见策略,如下:

代码语言:java
复制
//抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
//直接丢弃,置之不理
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
//丢弃最旧的任务空出位置,然后将该任务重新投入线程池
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
//由线程池外的调用者线程直接执行该任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、基本概念
    • 1 作用
      • 2 线程池状态
        • 3 线程池类关系
        • 二、线程池
          • 1 数据结构
            • 2 execute
              • 3 线程池关闭
                • 3.1 shutdown
                • 3.2 shutdownNow
            • 三、Worker
              • 1 数据结构
                • 2 run
                  • 3 为什么继承AQS
                  • 四、拒绝策略
                  相关产品与服务
                  容器服务
                  腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档