今天有个朋友提了一个问题,模拟代码如下:
public class ThreadPoolDemo {
public static void main(String[] args) {
int nThreads = 10;
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
executorService.execute(() -> System.out.println("test"));
}
}
运行结束后发现程序“阻塞”了。
可以看到程序还在运行中。
JVM常见的退出原因有4种:
1、kill -9 pid 直接杀死进程
2、java.lang.System.exit(int status)
3、java.lang.Runtime.exit(int status)
4、没有非守护线程存活
那么我们回到上面的问题,分析为啥程序没结束。
我们查看定长线程池的构造函数
java.util.concurrent.Executors#newFixedThreadPool(int)
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
通过注释我们发现该线程池核心线程和最大线程数相同,工作队列为无界队列。
如果所有的核心线程都在执行任务,那么任务就会放到工作队列。如果执行过程中一个线程挂掉了,就会新建一个线程池来执行后续的任务。线程池中的线程将会一直存在,直到调用了ExecutorService#shutdown函数。
我们再看底层的ThreadPoolExecutor的构造函数
java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue)
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
根据注释我们可以看出几个核心参数的含义:
第1个参数:corePoolSize: 核心常驻线程池。如果等于0,任务执行完,没有任何请求进入则销毁线程;如果大于0,即使本地任务执行完毕,核心线程池也不会被销毁。这个参数设置非常关键设置过大浪费资源,设置过小导致线程频繁创建或销毁。
第2个参数:maximumPoolSize表示线程池能够容纳同时执行的最大线程数。
如果线程池中的线程数大于核心线程数且队列满了,且线程数小于最大线程数,则会创建新的线程。 第3个参数:keepAliveTime表示线程池中的线程空闲时间,当空闲时间达到keepAliveTime值时,线程会被销毁,直到只剩下corePoolSize个线程为止,避免浪费内存和句柄资源。
具体参见:java.util.concurrent.ThreadPoolExecutor#execute的注释部分。
在默认情况下,当线程池的线程数大于corePoolSize时,keepAliveTime才会起作用。
但是当ThreadPoolExecutor的allowCoreThreadTimeOut变量设置为true时,核心线程超时后也会被回收。
第4个参数: TimeUnit表示时间单位。keepAliveTime 的时间单位通常是TimeUnit.SECONDS。
第5个参数: workQueue 表示缓存队列。当请求的线程数大于maximumPoolSize时,线程进入BlockingQueue阻塞队列。
第6个参数: threadFactory 表示线程工厂。它用来生产一组相同任务的线程。线程池的命名是通过给这个factory增加组名前缀来实现的。在虚拟机栈分析时,就可以知道线程任务是由哪个线程工厂产生的。
第7个参数: handler 表示执行拒绝策略的对象。当超过第5个参数workQueue的任务缓存区上限且线程达到了maximumPoolSize的时候,就可以通过该策略处理请求。
这里默认的拒绝策略是抛出RejectedExecutionException异常
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
源码:
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
我们再次回归问题本身,我们分析一下代码:
public static void main(String[] args) {
int nThreads = 10;
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
executorService.execute(() -> System.out.println("test"));
}
线程池只执行了1次任务,而核心线程池和最大线程池都是10,因此第一个任务提交时需要创建1个线程来执行,当任务执行完毕,没有新的任务进来,但是核心线程池是不超时的,因此这个线程会一直“活着”等待任务。
核心线程池默认不超时的依据:
java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
为了更好地理解我们改编一下代码:
public static void main(String[] args) {
// 定义一个任务
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("test");
};
// 长度为10的定长线程池
int nThreads = 10;
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
// 给线程池起个名字
executorService.setThreadFactory(new NamedThreadFactory("定长线程池"));
// 执行两次任务(第二次执行时第一次还没结束)
executorService.execute(runnable);
executorService.execute(runnable);
// 活跃线程数
System.out.println(executorService.getActiveCount());
}
注意为了效果更明显,这里让任务停顿了20秒钟,并给线程池起了个名字。
根据上面的知识点,我们推测一下流程:
主线程创建线程池,线程池执行第一个任务(和上面一样),线程池执行第二个任务(此时第一个线程sleep 20秒)由于未达到核心线程数10,因此会创建第二个线程来执行第二个任务,第二个任务也sleep 20秒,此时主线程打印线程池的活跃线程数(正在执行任务的线程)此时应该为2个。
结果和设想的一样。
那么我们我们如何看是该线程池否有两个线程呢?
我们使用VisualVM查看该程序:
发现前我们创建两个线程先执行(时间可忽略)立即进入Sleeping ,然后Runnable状态然后执行(控制台打印了“test”,时间太短可界面都无法显示),然后进入WAITING状态
如图所示
通过线程dump我们可以看出线程从LinkedBlockingQueue取任务的时候阻塞了
java.util.concurrent.LinkedBlockingQueue#take
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
在这一行:notEmpty.await(); 将当前线程阻塞,底层用了java.util.concurrent.locks.LockSupport#park(java.lang.Object)。
感兴趣大家可以去看看 java.util.concurrent.locks.LockSupport#park(java.lang.Object)的用法和注释。
因此此线程池的两个核心线程一直存在并等待任务进入阻塞队列从而继续处理。
我们还可以再加一个任务来验证我的设想
public static void main(String[] args) throws InterruptedException {
// 定义一个任务
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("test");
};
// 长度为10的定长线程池
int nThreads = 10;
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
// 给线程池起个名字
executorService.setThreadFactory(new NamedThreadFactory("定长线程池"));
// 执行两次任务(第二次执行时第一次还没结束)
executorService.execute(runnable);
executorService.execute(runnable);
// 活跃线程数
System.out.println(executorService.getActiveCount());
TimeUnit.SECONDS.sleep(5L);
executorService.execute(runnable);
}
大家思考线程执行的状态,并通过VisualVM动态地观察效果。
通过上面的介绍我们知道,因为核心线程池不超时所以创建的核心线程一直存活,核心线程池阻塞的原因是从阻塞队列中取数据时被阻塞队列阻塞掉了。
由于有非守护线程一直存活所以虚拟机不会退出,因此程序也不会结束。
可能有人会说“线程池执行完任务都不会销毁的”,是吗?看看下面的例子:
那么我们再看一下下面的程序执行会怎样?
public static void main(String[] args) throws InterruptedException {
int nThreads =10;
ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(nThreads);
// 允许核心线程池超时,超时时间为2s
executorService.setKeepAliveTime(2L, TimeUnit.SECONDS);
executorService.allowCoreThreadTimeOut(true);
executorService.execute(()-> System.out.println("test"));
}
执行后发现打印完test以后,等待2s没有任务,核心线程池的线程销毁,由于没有非守护线程,虚拟机退出(exit code 0)。
我们还可以通过断点来学习线程池的各种属性,并观察运行状态等。
public static void main(String[] args) throws InterruptedException {
// 定义一个任务
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("test");
}; // 长度为10的定长线程池
int nThreads = 10;
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads); // 给线程池起个名字
executorService.setThreadFactory(new NamedThreadFactory("定长线程池")); // 执行两次任务(第二次执行时第一次还没结束)
executorService.execute(runnable);
executorService.execute(runnable);
// 活跃线程数
System.out.println(executorService.getActiveCount());
}
我们在打印语句处断点,注意断点是只选择Thread:
否则会断住所有线程。
效果如下:
可以看到是否允许核心线程超时,完成的任务数,可以查看workers来查看工作的线程状态等。
还可以查看等待的条件和等待队列等信息:
学习并发可以多用调试,多种学习手段相结合,效果更好。
我们发现执行任务的线程被封装成了线程池的Worker对象:
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet workers = new HashSet();
java.util.concurrent.ThreadPoolExecutor.Worker
继承自AbstractQueuedSynchronizer (AQS)并实现了Runnable接口。
感兴趣大家可以看源码,根据调试信息等深入学习。