在上篇文章:Java 多线程—线程池(上) 中我们看了一下 Java 中的阻塞队列,我们知道阻塞队列是一种可以对线程进行阻塞控制的队列,并且在前面我们也使用了阻塞队列来实现 生产者-消费者模型
。在文章最后,我们还看了一下 Future
接口和其中对应的方法,如果你对这些不熟悉,建议先去看一下上一篇文章。有了前面的知识作为基础之后,我们来正式看一下 Java 中的线程池。
首先来看一下线程池的作用:Java 已经给我们提供了多线程机制,那么线程池是为了解决什么问题呢? 我们设想一下:假设现在我们打算使用 Java 编写一个服务器端的程序,那么对于每个用户的请求,为了提高服务器资源的利用率和用户请求的响应速度,我们可能会采用给每一个用户请求都新建一个线程来处理这个请求并且在处理完成之后返回响应,那么这样的话可能会带来一个问题:假设用户的访问非常频繁,并且每次请求的资源都是比较简单的,即对于每个请求,服务器可能只需要很少的资源就可以处理好,那么此时频繁的创建新线程来处理用户请求就可能会导致创建和销毁线程所带来的服务器资源开销本身就大于处理用户请求的开销了。这样的话明显得不偿失。 那么使用线程池能够解决这个问题吗?答案是可以。线程池可以理解成一个处理任务的线程集合,其中的线程有一个特点是:在某个线程处理完成某个任务之后并不会立即被销毁,而是会保留一段时间(这个取决于不同种类的线程池的不同实现),而如果之后又有任务来了,那么线程就会处理之后来的任务。这样的话就避免了只通过一味的创建新线程来处理任务的缺陷。而线程池本身也是适用于处理那些任务繁多并且每个任务比较简单(相对而言)的情景。我们来通过一幅图来理解一下线程池工作的基本原理:
其中的任务队列即为阻塞队列,当然这只是代表线程池的基本原理,对于不同设计理念的线程池在具体实现上肯定会有所差异。下面来看一下 Java 中的线程池。
Java 中提供了一个 Executors
类,这个类类似于线程池的工厂,我们可以通过它来创建各类线程池,我们看看其中的一些方法:
public class Executors {
// ....
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// ....
}
我们会发现 Executors
提供了大多数创建线程池的方法最后都是返回一个新的 ThreadPoolExecutor
对象,我们来重点看一下 ThreadPoolExecutor
类:
public class ThreadPoolExecutor extends AbstractExecutorService {
// ...
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
// ...
}
我截取了这个类中带有 7 个参数的构造方法,这个类提供了多个构造方法,但是终究是调用了这个带有 7 个参数的构造方法,我们来分析一下这个构造方法:
在此之前,我们还得再仔细了解一下 Java 中线程池的原理,相比在文章开头提供的那副图中解释的线程池原理,Java 提供的线程池原理更加复杂一些,Java 中线程池中的线程分为 核心线程
和 非核心线程
两种,两者有什么却别呢?
我们知道:一个线程池中提供的线程的数量是有限的,而在有新任务添加到线程池中时,如果线程中的核心线程数没有到达规定的核心线程最大数,那么便会创建新的核心线程来执行任务,否则的话就会把任务附加到 任务队列
的末尾,对于这个操作也有两种可能:如果任务队列未满,那么将任务添加到任务队列中,否则的话就会创建非核心线程来执行任务。那么如果非核心线程的数量也达到了最大值呢?这个时候线程池就会执行 饱和策略
了。这个过程可以通过下面的图来描述:
理解这个过程之后,下面回到这个类的构造方法中,我们再来看这个类的构造方法的参数:
corePoolSize:线程池中的最大核心线程数,默认情况下线程池是空的,没有线程,
只有在有任务提交到线程池中时才会创建线程,如果调用线程池对象的 prestartAllCoreThread() 方法,
那么线程池会提前创建好所有的核心线程。
maximumPoolSize:线程池中允许创建的最大线程数,
上文所说的非核心线程数为 maximumPoolSize - corePoolSize ,即为最大线程数减去核心线程数。
keepAliveTime:线程池中非核心线程允许闲置的最长时间,
超过这个时间的非核心线程将会被回收,对于任务很多并且每个任务处理时间较短的的情况,
可以适当提升 keepAliveTime 参数来提高线程利用率。
当设置 allowCoreThreadTimeOut 属性为 true时,keepAliveTime 参数也会作用到核心线程上。
unit:keepAliveTime 参数的时间单位
(天:DAYS、小时:HOURS、分钟:MINUTES、秒:SECONDS、毫秒:MILLISENDS 等)
workQueue:任务队列,储存任务的阻塞队列,上篇文章中有介绍
threadFactory:创建线程的工厂,一般情况使用默认的即可
handler:饱和策略,即为当任务队列和线程池中线程数均达到饱和时采取的应对策略,
默认是 AbordPolicy,表示无法处理新的任务,
并在有新任务提交时抛出 RejectedExecutionException 异常,此外还有 3 中策略:
1、CallerRunnsPolicy:使用提交该任务的线程来处理此任务
2、DiscardPolicy:不执行该任务,并将该任务删除
3、DiscardOldestPolicy:丢弃队列中最近的任务,并执行当前提交的任务
除去看构造方法之外,在类的声明中,我们发现,ThreadPoolExecutor
继承于 AbstractExecutorService
类,我们先一步步向上追溯到本源,我们来看看 AbstractExecutorService
这个类:
public abstract class AbstractExecutorService implements ExecutorService {
// ......
}
注意到这是个抽象类,这个类实现了 ExecutorService
接口,还是继续看一下这个接口吧:
public interface ExecutorService extends Executor {
// ....
}
这个接口又继承了 Executor
接口,注意这个是 Executor
接口,不是 Executors
类,最后来看看 Executor
接口:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
终于完结了。这个就是本源接口了,这个接口就声明了一个 execute
方法,参数是一个 Runnable
对象,这个方法其实就是向线程池中提交任务的核心方法,command 参数即为要执行的任务的 Runnable 对象。好了,现在我们从本源向下回去再看一遍涉及到的类和接口,先是 ExecutorService
接口:
public interface ExecutorService extends Executor {
/**
* 这个方法用于关闭线程池,调用这个方法之后,通过 execute 方法提交的任务将不会被接受,
* 但是其会等待线程池中任务队列中已有的任务和正在执行的任务执行完成之后再关闭线程池
*/
void shutdown();
/**
* 这个方法尝试立即关闭线程池,停止处理器正在执行的任务并拒绝接受新的任务,
* 并且将任务队列中未被执行的任务添加到一个 List 列表作为返回值返回,
* 请注意和 shutdown 方法的区别
*/
List<Runnable> shutdownNow();
/**
* 判断线程池是否已被关闭
*/
boolean isShutdown();
/**
* 判断线程池中所有的任务是否被完全终止,
* 只有在调用了 shutdown 或者 shutdownNow 方法之后这个方法才可能返回 true
*/
boolean isTerminated();
/**
* 阻塞调用该方法的线程,直到发生了下面三种情况:
* 1、方法参数规定的时间段过去,此时方法返回
* 2、线程池中所有的任务执行完成并且线程池被关闭,方法返回
* 3、调用该方法的线程发生了 InterruptedException 异常,此时方法会抛出 InterruptedException 异常
* 如果线程池中所有任务被成功的完成并且线程池成功关闭,那么方法返回 true,否则方法返回 false
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交新任务到线程池中,返回一个 Future 对象,这个对象封装了获取任务执行状态信息的方法
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交新任务到线程池中,将任务执行状态信息储存在 result 对象中,
* 最后返回一个 Future 对象,提供获取任务执行状态信息的方法
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 功能同上,参数变成了 Runnable 对象
*/
Future<?> submit(Runnable task);
/**
* 提交多个任务到线程池中,并且阻塞调用该方法的线程,
* 直到所有的任务都被执行完成或者调用该方法的线程发生 InterruptedException 异常,此时方法会抛出该异常,
* 方法返回一个保存了每个任务的执行状态信息的 Future 对象的 List 对象
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 功能同上,不过添加了时间参数限制,即为如果线程池没有在参数规定的时间内执行完成所有的任务,
* 那么方法会强制返回,此时,执行完成的任务对应的 Future 对象的 isDone() 方法返回 true,
* 代表对应任务执行完成,其他任务对应的 Future 对象的 isCancelled() 方法返回 true,
* 代表任务未执行完成并且被取消。
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 也是提交多个任务到线程池中,并且阻塞调用该方法的线程,但是这个方法是有任意一个任务被执行完成就会返回,
* 返回执行完成的那个任务的执行结果(即为对应任务 Callable 对象的 call() 方法的返回结果)
* 同样的如果在执行任务过程中调用该方法的线程发生了中断,方法会抛出一个 ExecutionException 异常
* 如果没有任何一个任务成功执行,那么方法会抛出一个 ExecutionException 异常
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 和上面方法同样的功能,在其基础上加了时间限制,即如果在规定时间内没有任何一个提交的任务执行完成,
* 该方法会返回,同时抛出一个 TimeoutException 异常
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
我们继续向下,前面已经知道 AbstractExecutorService
实现了这个接口,我们来看看这个类的关键方法:
public abstract class AbstractExecutorService implements ExecutorService {
// ...
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// ...
}
这个类中的大多数方法均为实现的 ExecutorService
接口中的方法,这个接口中的方法在上面的代码中已经有详细的注释了,为了理清逻辑,这里就不贴其他方法了,小伙伴们可以自行查看源码。我们来看一下 submit
方法,在 2 个重载版本中,其先调用了 newTaskFor(task)
方法来得到一个 RunnableFuture
对象,我们可以看到 newTaskFor(task)
方法其实返回的是一个 FutureTask
对象,RunnableFuture
其实是一个接口,其源码如下:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
这个接口继承了 Runnable
接口和 Future
接口,Future
接口上篇文章中已经介绍过了。因此其对象既可以作为 Runnable
对象来给 execute(Runnable task)
提供参数,也可以作为 submit
的返回值。事实上,这个接口对象封装了两个对象,分别是要执行的任务对象(Runnable
)和保存任务的执行状态信息对象(Future
),而这个接口的实现类 FutureTask
则是实现了 RunnableFuture
接口中的方法,在其的 run()
方法中会调用创建 FutureTask
对象时传入的 Callable
对象的 call()
方法或者是 Runnable
对象的 run()
方法。
关于这个类的更多信息,具体可以看一下 FutureTask
的源码 。
我们继续:两个 submit
方法都通过 execute
方法来向线程池中提交任务,而 AbstractExecutorService
类中并没有实现 execute
方法,那么我们就只好去其子类 —— ThreadPoolExecutor
类中寻找答案了:
public class ThreadPoolExecutor extends AbstractExecutorService {
// ...
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 代表线程池运行状态的常量
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 包装和解包 ctl 变量的方法,通过这些方法来提取线程池的一些状态信息:
// 获取线程池的运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池的当前线程总数
private static int workerCountOf(int c) { return c & CAPACITY; }
// ...
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// ctl 是一个 AtomicInteger 类型的对象,即为原子类,
// 可以将 ctl 理解成保存了线程池的线程数、运行状态等信息的变量,
// 通过对应的方法可以提取出对应的信息,
// 比如 workerCount(ctl) 方法可以得到当前线程池中的线程总数
int c = ctl.get();
// 检测当前线程池的核心线程数是否饱和,如果没有,那么创建新的核心线程,
// 并且将当前任务作为该核心线程的第一个执行任务
if (workerCountOf(c) < corePoolSize) {
// 如果成功创建了核心线程处理任务,方法返回
if (addWorker(command, true))
return;
// 防止在这个过程中又有新的任务提交了造成错误,于是需要再次获取检查一次变量值
c = ctl.get();
}
// 如果当前线程池处于运行状态,并且任务成功的添加到任务队列
if (isRunning(c) && workQueue.offer(command)) {
// 同样的道理,为了防止添加任务到任务队列中又有新的任务提交造成错误,再次更新变量值
int recheck = ctl.get();
// 如果线程池不处于运行状态(shutdown、stop),
// 并且将刚添加的任务成功从任务队列移除,执行饱和策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果创建非核心线程执行任务失败,那么证明整个线程池的线程数达到最大线程数、任务队列已满,
// 或者是调用了线程池的 shutdown() 方法,拒绝接受任何新的任务,此时应该调用饱和策略
else if (!addWorker(command, false))
reject(command);
}
// ...
}
为了方便你理解这个方法,我在这个方法的源码中添加了中文注释,通过注释相信你已经能够理解这个方法的大致流程了,我们注意到 execute
方法中多次通过 addWorker
方法来添加线程处理任务,我们下面来看一下 addWorker
方法:
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
// 添加新的线程来将 firstTask 任务作为第一任务执行,执行完成之后执行线程池任务队列中其他任务,
// core 参数为是否添加核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 记录当前线程池运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果线程池已经被停止或者关闭等,那么返回 false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程池中线程数
int wc = workerCountOf(c);
// 如果线程数大于线程池最大线程或者想添加核心线程来处理 firstTask 但是核心线程已经饱和
// 或者添加非核心线程来处理 firstTask 但是线程池总线程数已达到饱和,返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果保存线程池状态信息的变量没发生变化,证明这个过程线程池是没有发生状态变化的,
// 此时将 ctl 值更新,使得其包装的线程池工作线程数信息加一
if (compareAndIncrementWorkerCount(c))
break retry;
// 防止这个过程中又有其他操作进行造成错误,再次读取 ctl 的值
c = ctl.get(); // Re-read ctl
// 如果线程池当前运行状态和线程池前面的运行状态不等,证明线程池状态发生改变,
// 调到外层循环重新执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 添加新的工作对象来处理任务,
// Worker 对象为 任务--线程 的包装类,创建该对象时会创建一个新的线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 同步块
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 检测当前线程池运行状态和方法参数是否合法
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 工作集中添加新建的线程包装类, workers 为一个 HashSet<Woker> 的对象
// 保存的是线程池中的所有 Worker 对象
workers.add(w);
int s = workers.size();
// 更新线程池中出现过的最大的线程数
if (s > largestPoolSize)
largestPoolSize = s;
// 工作添加标志置为 true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动新建的线程,执行任务
t.start();
// 线程启动标志置为 true
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
同样的将注释写在了关键代码上,在创建新的线程处理 firstTask
任务时通过新建 Worker
对象来完成,我们来简单看一下 Worker
类,这是一个 ThreadPoolExecutor
类的内部类:
为了不陷入源码循环中,这里只贴出了为了理解上面方法的必要代码,可以看到,这个类中封装了要执行的任务和执行这个任务的 Thread
线程对象,并且在创建线程时,传入了 this
参数,那么创建的线程在执行时就会调用这个对象的 run
方法,即调用 runWorker()
方法。
思路越来越清晰了,我们分析到这里其实已经可以得出结论了:线程池中储存的并不是 Thread
对象,而是封装过后的 Worker
对象,并且每一次新建一个 Worker
对象时都会把这个对象存入 workers
集合中,workers
其实 ThreadPoolExecutor
类的一个 HashSet<Worker>
集合类型的成员变量。
我们再来看一下 runWorker()
方法,注意,这个方法是 ThreadPoolExecutor
类中的:
在这个方法中线程会不断从线程池的任务队列中取任务并执行执行任务,直到取出的任务对象为 ,此时证明线程池已经关闭或者任务队列为空,这样的话证明当前线程可以被尝试回收,接下来就会跳出 while
循环进入 finally
语句块中执行来尝试回收线程:
Ok,下面我们来看一下线程怎么通过 getTask()
方法从任务队列中取出任务:
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 死循环,方法要么返回 null,要么返回 Runnable 对象代表线程取到了任务
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果线程处于 STOP 以上(STOP、TIDYING、TERMINAL)的状态或者任务队列为空,
// 证明线程池中已经没有任务应该被处理并且线程池应该被关闭,
// 因此将保存线程池信息的变量的保存的线程数信息减一,并且结束方法返回 null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果线程池中的线程数大于最大线程数或者允许核心线程被回收或者存在非核心线程并且任务队列为空,
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 则将保存线程池信息的变量的保存的线程数信息减一,证明当前线程可以被回收并且返回 null
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 线程去阻塞队列中取任务对象,
// 如果 timed 为 true ,调用 poll() 方法,线程只会阻塞不大于 poll() 方法参数指定时间,
// 此时如果过了这个时间还没有取到任务,那么 pool() 方法返回 null,线程就会解除阻塞状态。
/// 如果 timed 为 false,那么会调用 take() 方法,此时线程会永久阻塞直到该方法有任务对象返回
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果线程在阻塞队列中取到了任务,则返回,否则设置 timedOut 为 true,
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
好了,到了这里,我们终于把线程池提交任务和执行任务的整个流程看完了。过程中涉及到的方法有点多,我们用一张图来描述这个流程:
好了,了解了整个线程池中的工作流程之后,我们再回到文章开头,我们来看一下 Executors
类给我们提供的常用的线程池:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
可以看到 FixedThreadPool
的核心线程数和最大线程数是相同的,也就是说 FixedThreadPool
只有核心线程,不存在非核心线程。因为没有非核心线程,那么第三个和第四个参数设置非核心线程的最大空闲时间就没有了作用(除非设置了 allowCoreThreadTimeOut 属性为 true),接下来是其任务队列,用的是 LinkedBlockingQueue
有界阻塞队列。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
可以看到,这个线程池只有一个核心线程来处理任务,任务队列也是使用的 LinkedBlockingQueue
有界阻塞队列。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
这个线程池不使用核心线程,而对非核心线程的数量为 Integer.MAX_VALUE
,即为不限制非核心线程的数量,
同时,每个线程的过期时间为 60 秒,任务队列采用的是 SynchronousQueue
,这个队列不储存元素,正好符合这个线程池的特性。该线程池适用于处理大量需要立即处理并且每个任务耗时较少的任务集合。
常用的线程池就介绍到这里了,对于一些其他的线程池的用法,小伙伴们可以自行阅读相关资料。
好了, 这篇文章中我们一起看了一下 Java 中的线程池,并且从源码的角度上将线程池的工作流程过了一遍,
内容有点多,但其实还有些内容没有介绍,比如说 Java 中常见的线程池(FixedThreadPool
、ChachedThreadPool
、SingalThreadExecutor
、ScheduledThreadPool
等)的源码运行流程,如果你理解了这篇文章的内容,那么对于这些常用的线程池完全可以自己看源码来了解其用法。
下一篇文章也应该是本专栏 Java 多线程板块的最后一篇了,在下篇文章中将会介绍线程组的相关知识点和对整个 Java 多线程板块进行一个总结。 如果博客中有什么不正确的地方,还请多多指点。如果这篇文章对您有帮助,请不要吝啬您的赞,欢迎继续关注本专栏。
谢谢观看。。。