前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java线程池

Java线程池

作者头像
spilledyear
发布2020-02-10 11:47:14
9140
发布2020-02-10 11:47:14
举报
文章被收录于专栏:小白鼠小白鼠

基本概念

  1. 任务: 就是你自己实现的任务逻辑,一般为Runnable实现类或Callable实现类,不过在线程池中已经被封装成一个FutureTask. 在我们向线程池中提交一个任务的时候,会先判断目前线程池中的workerCount是否小于核心线程数,如果小于则将这个任务封装成一个Worker,然后启动一个新线程;如果不小于则将这个任务添加到工作队列
  2. 工作队列: 工作队列BlockQueue的实现类,它的作用就是用来缓存任务的,因为它本身是线程安全的,所以在向工作队列的时候不需要格外处理线程安全问题
  3. Worker: 可以认为每个Worker对应一个线程,在我们创建Worker的时候,会传入一个任务,这个任务就是这个Worker首次要执行的逻辑,执行完之后它就会去工作队列拿任务执行. 所有的Worker都保存在一个HashSet数据结构中,所以在向HashSet添加Worker的时候需要去处理线程安全问题,线程池中是通过ReentrantLock来保证线程安全

工作流程

其实在说这个之前我们可以先考虑一下线程池出现的目的: 因为创建线程需要比较大的开销,并且线程数太多的情况下上下文切换比较频繁,所以我们希望有一种机制来改善它,这就是线程池,改善的核心就是控制线程的数量,通过暴露接口,可以满足用户创建不同场景下的线程池

  1. 来任务了,先创建几个线程 核心线程数
  2. 任务太多了,处理不过来,总不能一直创建线程吧,这时候就将任务缓存到 工作队列
  3. 任务实在是太多,工作队列都满了,那就再创建几个线程吧 最大线程数
  4. 任务真的真的太多了,还是处理不过来,拒绝吧,提供了几种 拒绝策略
  5. 其他: 一段时间后,任务太少了,那些一直不工作的线程怎么处理? 空闲时间

使用示例

代码语言:javascript
复制
ExecutorService executorService = new ThreadPoolExecutor(
        1,
        1,
        3,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(2),
        (r) -> new Thread(r),
        (r, executor) -> System.out.println("拒绝"));

for (int i = 0; i < 5; i++) {
    executorService.submit(() -> {
        System.out.println(Thread.currentThread().getName());
        sleep(TimeUnit.MILLISECONDS, 50);
    });
}


----------------------------------------------执行结果----------------------------------------------
拒绝
拒绝
Thread-0
Thread-0
Thread-0

ThreadPoolExecutor

线程池的核心实现类,基于ThreadPoolExecutor可以实现满足不同场景的线程池

  1. acl: 类型为AtomicInteger,该变量包括两部分内容: 低29位用于表示workerCount,即线程池中的线程数,高3位用于表示线程池的状态,即RUNNING SHUTDOWN STOP TIDYING TERMINATED
  2. 状态之间的转换
代码语言:javascript
复制
RUNNING -> SHUTDOWN
   On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP
   On invocation of shutdownNow()
SHUTDOWN -> TIDYING
   When both queue and pool are empty
STOP -> TIDYING
   When pool is empty
TIDYING -> TERMINATED
   When the terminated() hook method has completed

构造函数

代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  1. corePoolSize: 核心线程数,提交一个任务时,如果线程池中的线程数没有达到核心线程数,则会创建一个新的线程
  2. maximumPoolSize: 最大线程池,工作队列满了的情况下,如果线程池中的线程数没有达到最大线程数,则会创建一个新线程,否则使用拒绝策略
  3. keepAliveTime: 空闲线程存活时间,工作对立中没有任务时,线程最大等待时间,其实就是工作队列的带时间阻塞
  4. workQueue: 工作队列,存放任务的
  5. threadFactory: 创建线程工厂类
  6. handler: 线程池满了情况下,提交任务时对应的拒绝策略,可以自己实现,默认提供了几种

提交任务

代码语言:javascript
复制
// AbstractExecutorService#submit
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // 创建一个FutureTask对象
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    // 执行 ThreadPoolExecutor#execute
    execute(ftask);
    return ftask;
}
  1. 基于Runnable创建一个FutureTask对象,这样可以获取返回值了,因为Runnable没有返回值,所以这里直接传null
  2. 调用ThreadPoolExecutor#execute方法
ThreadPoolExecutor#execute
代码语言:javascript
复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
        * Proceed in 3 steps:
        *
        * 1. If fewer than corePoolSize threads are running, try to
        * start a new thread with the given command as its first
        * task.  The call to addWorker atomically checks runState and
        * workerCount, and so prevents false alarms that would add
        * threads when it shouldn't, by returning false.
        *
        * 2. If a task can be successfully queued, then we still need
        * to double-check whether we should have added a thread
        * (because existing ones died since last checking) or that
        * the pool shut down since entry into this method. So we
        * recheck state and if necessary roll back the enqueuing if
        * stopped, or start a new thread if there are none.
        *
        * 3. If we cannot queue task, then we try to add a new
        * thread.  If it fails, we know we are shut down or saturated
        * and so reject the task.
        */

    int c = ctl.get();
    // 如果`workerCount < corePoolSize`,则尝试创建一个新线程,创建成功就直接返回,失败继续下面的流程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    // 如果线程池正在运行并且将该任务添加到工作队列成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查线程池是否正在运行,如果不在运行了就将该任务移除并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }

    // 如果`workerCount >= corePoolSize && 工作队列放不下了`,再次尝试添加一个新线程,如果添加失败则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
  1. 如果workerCount < corePoolSize,则尝试创建一个新线程,创建成功就直接返回,失败继续下面的流程
  2. workerCount >= corePoolSize,再次检查线程池是否正在运行,如果不在运行了就将该任务移除并执行拒绝策略
  3. 如果workerCount >= corePoolSize && 工作队列放不下了,再次尝试添加一个新线程,如果添加失败则执行拒绝策略
ThreadPoolExecutor#addWorker

该方法用于尝试向线程池中添加一个新的线程,如果线程池运行状态不正常,则会添加失败

代码语言:javascript
复制
1. `firstTask`: 任务的具体逻辑,这里是一个`FutureTask`对象
2. `core`: 如果为true,这和`corePoolSize`比较,否则和`maximumPoolSize`比较. 因为执行`addWorker`方法只有两种情况:一种是`workerCount<corePoolSize`;一种是工作队列已满,这时需要和`maximumPoolSize`比较
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.  仅在必要时检查队列是否为空, 状态 第二个括号里的条件估计和SHUTDOWN语义有关,后面再看吧
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        // 通过自旋操作更新`workerCount`的值,即:加1
        for (;;) {
            // 获取目前线程池中的线程数
            int wc = workerCountOf(c);

            // 因为执行`addWorker`方法只有两种情况:一种是`workerCount<corePoolSize`,这时需要和`corePoolSize`比较; 一种是工作队列已满,这时需要和`maximumPoolSize`比较
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;

            // 通过CAS操作尝试对`workerCount`加1,如果成功就跳出最外层循环
            if (compareAndIncrementWorkerCount(c))
                break retry;

            c = ctl.get();  // Re-read ctl

            // 如果在自旋(内循环更新`workerCount`值)期间,线程池的状态发生变化,重新进入外循环
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 这里的`ReentrantLock`主要作用是保证添加`Worker`到`workers`时是线程安全的,因为`workers`是`HashSet`结构,其本身不是线程安全的
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // 如果线程池正在运行
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    // 如果t.isAlive()这说明线程已经被启动了,这时候直接抛异常,一般不会出现 
                    if (t.isAlive()) // precheck that t is startable
                        throw new fIllegalThreadStateException();

                    // 将该任务添加到`workers`中,`workers`是一个`HashSet`结构,不过这里通过`ReentrantLock`保证它是线程安全的
                    workers.add(w);

                    int s = workers.size();
                    // 更新`largestPoolSize`,该值用于表示线程池中曾经达到的最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;

                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 最后启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 根据上面的代码来判断,如果线程池运行状态不正常的时候,会添加`Worker`失败,然后执行`addWorkerFailed`方法
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

注释已经写的很详细了,总结一下:

  1. 先通过自旋更新workerCount的值
  2. 添加Workerworkers中时需要通过ReentrantLock保证线程安全,因为workersHashSet结构,其本身不是线程安全的
  3. 线程池运行状态不正常时,会添加Worker失败,此时需要执行ThreadPoolExecutor#addWorkerFailed方法
ThreadPoolExecutor#addWorkerFailed

执行到这里,说明线程池可能已经出现了问题,这时候需要回滚之气那的操作.即恢复workerCount的值,然后将该Workerworkers中移除,并尝试停止线程池

代码语言:javascript
复制
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            // 从`HashSet`中移除
            workers.remove(w);
        // 对`workerCount`减1
        decrementWorkerCount();
        // 尝试停止线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}
  1. HashSet中移除刚刚添加的Worker
  2. workerCount减1
  3. 尝试停止线程池

执行任务

通过上面的代码可以发现,提交任务的时候,如果创建了一个新的Worker实例,就相当于创建了一个新的线程,并且会启动该线程. 那线程启动之后主要做了什么? Thread#start => Worker#run => ThreadPoolExecutor#runWorker

Worker
  1. 实现了Runnable接口,因此当线程启动之后,就会执行Worker#run方法
  2. 继承自AbstractQueuedSynchronizer,说明它具有锁的功能,但它是不可重入锁
  3. 在构造函数中,已自己为参数,创建一个线程,并将该线程作为自己的一个属性thread
代码语言:javascript
复制
Worker(Runnable firstTask) {
    // 设置state=-1,则无法获取锁, 在runWorker中会先执行unlock方法,然后再执行lock方法获取锁
    setState(-1); // inhibit interrupts until runWorker
    // 最开始执行的那个任务,之后的任务去队列里面拿
    this.firstTask = firstTask;
    // 以自己为参数创建一个线程
    this.thread = getThreadFactory().newThread(this);
}

Worker#run方法中,直接调用了ThreadPoolExecutor#runWorker方法

ThreadPoolExecutor#runWorker
代码语言:javascript
复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 先取第一个任务执行,这是在构造函数中传入的
    Runnable task = w.firstTask;
    w.firstTask = null;

    // 因为在Worker构造函数中默认设置了state为-1,需要先执行`Worker#unlock`将state设置为0
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果首个任务不为null并且工作队列里面还有任务
        while (task != null || (task = getTask()) != null) {
            w.lock();

            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 有点不太懂
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();

            try {
                // 前置处理,默认为空
                beforeExecute(wt, task);

                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 后置处理,默认为空
                    afterExecute(task, thrown);
                }
            } finally {
                // 设置task为null, 用于取下一个任务
                task = null;
                // 完成任务数加1
                w.completedTasks++;
                w.unlock();
            }
        }

        // 工作队列里面没任务了,并且在获取任务时在工作队列上阻塞的时候大于空闲时间,并且时正常结束的,即没有发生什么异常
        completedAbruptly = false;
    } finally {
        // 将该Worker从HashSet中移除,执行一些销毁操作
        processWorkerExit(w, completedAbruptly);
    }
}
  1. 先执行firstTask,该任务在创建Worker时传入
  2. 再从工作队列中取任务执行
  3. 执行完成之后,说明runWorker将要退出了,这时候同时需要将该Worker从HashSet中移除

todo 最核心的,中断处理,即那个判断条件,也就是Worker实现AQS的目的

空闲线程清理

在创建线程池的时候,有提到一个参数:空闲时间,这个空闲时间是什么意思呢? Worker执行完firstTask之后,就会去工作队列中拿任务继续执行,工作队列是一个阻塞队列,当工作队列中没有任务时,线程就会阻塞,直到有提交了新的任务. 这个空闲时间其实就可以理解成该线程的阻塞时间,这部分逻辑在ThreadPoolExecutor#getTask方法中

代码语言:javascript
复制
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // allowCoreThreadTimeOut表示线程是否永久存货, 默认是永久存活, 结合下面的代码说明在这两种情况下,空闲时间生效: 1.allowCoreThreadTimeOut==true  2.工作线程数大于corePoolSize
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
  1. 空闲时间其实就是线程在阻塞队列上阻塞的最大时间,即通过阻塞队列实现
  2. 在这两种情况下,空闲时间才会生效: allowCoreThreadTimeOut==true 或者 工作线程数大于corePoolSize

常见线程池

通过Executors可以快速的创建一些不同类型的线程池

ExecutorService#newSingleThreadExecutor

代码语言:javascript
复制
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  1. corePoolSize为1,maximumPoolSize为1,意味着线程池中最多只有一个工作线程
  2. 空闲时间为0,表示没任务立即销毁该线程
  3. 工作队列LinkedBlockingQueue,这其实是一个有界的阻塞队列,但是由于这里没有在创建LinkedBlockingQueue的时设置容量,所以默认为Integer.MAX_VALUE

优缺点

  1. 对阻塞队列的长度没有限制,可能会造成OOM

ExecutorService#newCachedThreadPool

代码语言:javascript
复制
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}
  1. corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,意味着线程数可以为 Integer.MAX_VALUE
  2. 空闲时间为60s,也就是一分钟 3.工作队列SynchronousQueue,这是一个比较特殊的阻塞队列,当一个生产者线程向队列中存数据时,生产者线程将被阻塞直到有另一个消费者线程从队列中取数据(即take),反之亦然

优缺点

  1. 适合执行时间比较短的任务,这种情况下,很多线程可以被复用,避免每次都创建大量线程的开销
  2. 但在任务执行时间比较长的情况,由于该线程池对线程数没有限制,可能会创建非常多的线程.

ExecutorService#newFixedThreadPool

代码语言:javascript
复制
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}
  1. corePoolSize为入参,maximumPoolSize为入参
  2. 空闲时间为0,表示没任务立即销毁该线程
  3. 工作队列LinkedBlockingQueue,这其实是一个有界的阻塞队列,但是由于这里没有在创建LinkedBlockingQueue的时设置容量,所以默认为Integer.MAX_VALUE

这个其实和newSingleThreadExecutor有点像,只不过newSingleThreadExecutor中只有一个线程,而newFixedThreadPool是固定的线程

优缺点

  1. 对阻塞队列的长度没有限制,可能会造成OOM

ExecutorService#ScheduledThreadPool

代码语言:javascript
复制
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(),threadFactory);
}
  1. corePoolSize为入参,maximumPoolSize为Integer.MAX_VALUE
  2. 空闲时间为0,表示没任务立即销毁该线程
  3. 工作队列DelayedWorkQueue,这是一个有界的阻塞队列

优缺点

  1. 对阻塞队列的长度没有限制,可能会造成OOM

总结

还是推荐根据具体场景,基于ThreadPoolExecutor定制自己的线程池

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基本概念
  • ThreadPoolExecutor
    • 构造函数
      • 提交任务
        • 执行任务
          • 空闲线程清理
          • 常见线程池
            • ExecutorService#newSingleThreadExecutor
              • ExecutorService#newCachedThreadPool
                • ExecutorService#newFixedThreadPool
                  • ExecutorService#ScheduledThreadPool
                    • 总结
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档