不光是线程池,池化思想在诸多地方有着很好的应用,比如对象池、连接池等等。。一般运用池化思想的都是一些比较消耗系统资源的操作,通过池化,可以降低内存消耗,并且可以进行复用操作,提高效率。同时池化还可以统一的对资源进行管理,控制他们的创建与销毁。
首先最简单的方式,是使用juc包提供的Executors进行创建,这个类为我们提供了几种比较简单的线程池对象
但是但是但是
《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
答案很简单,通过上面的源码就可以看到为什么:
综上所述,不推荐使用Executors创建线程,当然上面说的其实只是一部分,阿里的规范其实更多的是希望程序员自己直接使用ThreadPoolExecutor创建线程池,是为了让程序员可以自己完全控制掌握自己的线程池。
仔细看看上面Executors提供的几个常用的线程池,看到他们的实现,其实都是调用了new ThreadPoolExecutor(xxxxx),只是参数上做了些小小的区别。所以最正确的创建方式就是直接使用ThreadPoolExecutor类进行线程池创建。
查看ThreadPoolExecutor源码可以看到,ThreadPoolExecutor提供了四个构造方法
但是前三个都是弟弟,因为他们都套娃,一个一个调用,最后实际都是调用的第四个构造方法
下面着重开始介绍ThreadPoolExecutor类
说句实话,作为一个基础非常不扎实的小白,,明明是中国人,明明是中国字,但我咋看不懂呢。。。
然后看到了大神举了一个非常生动的例子,瞬间跪了
首先有一个银行(线程池),你是幕后boss(程序员),可以办理业务,平时没啥人,就是三个窗口办业务(这里每个业务窗口就是线程,3个就是corePoolSize)。有天办业务的人有点多,三个柜台都满了,大堂经理就安排新来的客户在候客区等待(把任务塞进阻塞队列),鬼知道什么情况,把业务的人越来越多,候客区也满了,经理就只好把在家休息的小王小李。。。都叫来加班,又开了2个窗口(现在一共5个窗口,是银行柜台的上限,就是maximumPoolSize),现在有五个窗口同事进行业务办理工作,再后来五个窗口满了,候客区也满了,新来的客户坐不下了,咋办?为了维持银行秩序,经理必须想几个办法改善现状(这里想的几个办法就是饱和策略,具体的饱和策略下面有介绍)
其实这个例子里面也包含了ThreadPoolExecutor的执行原理,先有个印象
这上面这个例子里面可以很清晰的知道几个核心参数的含义(除了keepAliveTime、unit、threadFactory),所以单独说一下
通过查看execute方法的源代码,可以很清晰的看到这些执行流程(jdk也很清楚地在注释中写了这个执行过程)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 比较工作线程和核心线程池大小
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 尝试将任务放入缓存队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 尝试创建新的线程
else if (!addWorker(command, false))
// 上述都失败了,就执行拒绝策略
reject(command);
}
12345678910111213141516171819202122232425262728293031323334353637383940414243
ThreadPoolExecutor中没有重写父类AbstractExecutorService的submit方法
JavaGuide里面有一个很好地描述:
线程池有五大生命周期
线程池生命周期的转换图如下:
代码内部是如何维护的呢?
ThreadPoolExecutor内部维护了有一个成员变量ctl,他是一个AtomicInteger类型的变量。它是对线程池运行状态和线程池中有效线程数量进行控制的字段,Integer值一共有32位,其中高3位表示”线程池状态”,低29位表示”线程池中的任务数量”。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 通过位运算获取线程池运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 通过位运算获取线程池中有效的工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 初始化ctl变量值
private static int ctlOf(int rs, int wc) { return rs | wc; }
123456789101112131415161718
为什么用一个Integer来维护两个值
这里的解析来自一个大神的博文,我做了一个整理,添加了我自己的一些见解:http://objcoding.com/2019/04/25/threadpool-running/
通过查看execute源码可以看到当线程池有余时会调用addWorker方法,创建线程执行方法,那么是如何进行的呢?继续向下探索
可以简单讲addWorker方法分为两个部分,一个是上面的一堆判断,然后就是主菜创建Worker
先看前部分,可以看出前部分是一个for循环,这个for循环的主要作用就是判断当前线程池的状态可不可以添加任务,特别说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了;同时增加工作线程数量使用了AQS作同步,如果同步失败,则继续循环执行。
retry:
for (;;) {
int c = ctl.get();
// 获取线程池当前运行状态
int rs = runStateOf(c);
// 如果rs大于SHUTDOWN,则说明此时线程池不在接受新任务了
// 如果rs等于SHUTDOWN,同时满足firstTask为空,且阻塞队列如果有任务,则继续执行任务
// 也就说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取有效线程数量
int wc = workerCountOf(c);
// 如果有效线程数大于等于线程池所容纳的最大线程数(基本不可能发生),不能添加任务
// 或者有效线程数大于等于当前限制的线程数,也不能添加任务
// 限制线程数量有任务是否要核心线程执行决定,core=true使用核心线程执行任务
if (wc >= CAPACITY ||
// core是addWorker的第二个参数
// 通过此处可以看出,core为true时,会判断当前有效线程的数量与核心线程数进行比较
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用AQS增加有效线程数量
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果再次获取ctl变量值
c = ctl.get(); // Re-read ctl
// 再次对比运行状态,如果不一致,再次循环执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
12345678910111213141516171819202122232425262728293031323334353637
下半部分
下半部分的源码主要的作用是创建一个Worker对象,并将新的任务装进Worker中,开启同步将Worker添加进workers中,这里需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁。添加步骤做完后就启动线程来执行任务了,继续往下看。
// 任务是否已执行
boolean workerStarted = false;
// 任务是否已添加
boolean workerAdded = false;
// 任务包装类,我们的任务都需要添加到Worker中
Worker w = null;
try {
// 创建一个Worker
w = new Worker(firstTask);
// 获取Worker中的Thread值
final Thread t = w.thread;
if (t != null) {
// 操作workers HashSet 数据结构需要同步加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取当前线程池的运行状态
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
// rs是RUNNING状态时,直接创建线程执行任务
// 当rs等于SHUTDOWN时,并且firstTask为空,也可以创建线程执行任务,也说说明了SHUTDOWN状态时不再接受新任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动线程执行任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
可以很清晰的看到,当worker被创建好后直接丢到了workers中,然后进行了t.start操作,直接开始执行线程。然而t是worker的成员变量,所以具体如何执行的呢?继续向下看
上述的addWorker中大量的出现了worker对象,那么到底什么是worker对象呢,一起来看看他的源码。
可以看出worker是threadpoolexecutor的内部类,有两个成员变量,firstTask和thread,通过他的构造方法可以看出,fristTask是在创建对象时被初始化的。
这里特别强调,firstTask是开启线程执行的首个任务,之后常驻在线程池中的线程执行的任务都是从阻塞队列中取出的,需要注意。
Worker继承自AQS(这个后面开坑再讲)实现了Runnable接口,核心就是这个run方法,所以最最根本的线程任务执行就是run方法,可以看到Worker重写的run方法里面调用了runWorker这个方法传入了当前对象。继续往下看
runWorker就更加的简单了,首先是取得worker对象的firstTask,这是初次情况,firstTask不为空,进入while循环会进行判断,初次直接使用firstTask,随后都是调用的getTask进行取任务。
下面的代码中注释的地方写的非常详细,有两个地方需要注意下 1. beforeExecute 2. afterExecute
这两个方法是ThreadPoolExecutor特意保留的两个方法,默认是没有实现的,他运行程序员对他进行实现,帮助我们完成一些线程执行时的一些小操作~
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环从workQueue阻塞队列中获取任务并执行
while (task != null || (task = getTask()) != null) {
// 加同步锁的目的是为了防止同一个任务出现多个线程执行的问题
w.lock();
// 如果线程池正在关闭,须确保中断当前线程(这里就是调用shutdownnow会使正在执行中的线程结束的代码)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前可以做一些操作
beforeExecute(wt, task); // 注意这个方法
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后可以做一些操作
afterExecute(task, thrown); // 注意这个方法
}
} finally {
// 将task置为空,让线程自行调用getTask()方法从workQueue阻塞队列中获取任务
task = null;
// 记录Worker执行了多少次任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程回收过程
processWorkerExit(w, completedAbruptly);
}
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
再看这里的解析的时候,我同样搜集了很多网上的解释,稍微有点点晦涩,我按照我的理解从新写了一段
首先问题是如何保证核心线程不被销毁,那么先看看什么情况下会被销毁?
没错就是上面这段runWorker()中的最后,当while循环退出时,会执行processWorkerExit操作这个操作就是销毁的核心
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 可以看到这里将worker对象从set中移除,销毁了当前的工作线程对象
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
12345678910111213141516171819202122232425262728
按照这个逻辑,必须要退出while循环的时候,他才会进行销毁,那么怎样才能退出while循环呢,再看看while循环的判断
task != null || (task = getTask()) != null
1
||左边不多说,主要看右边,当getTask获取不到对象的时候才会为false,所以控制它的关键在于getTask方法
重点研究一下(这里我看了另一个不错的博客:https://www.jianshu.com/p/8848860b9ad4),原作者对于getTask的代码做了一个简化分析
// 为分析而简化后的代码
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int wc = workerCountOf(c);
// timed变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (timed && timedOut) {
// 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时,那么尝试将workerCount减1,即当前活动线程数减1,
// 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 注意workQueue中的poll()方法与take()方法的区别
//poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null
//take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
1234567891011121314151617181920212223242526272829303132333435363738
从以上代码可以看出,getTask()的作用是:
所以allowCoreThreadTimeOut可以控制核心线程是否进行销毁
ThreadPoolExecutor提供了两个方法进行线程池关闭操作:shutdown和shutdownNow
在上面的线程池生命周期中可以很好地看出这两个操作的区别
shutdown只是将线程池的状态设置为SHUTWDOWN状态,正在执行的任务(包含阻塞队列中等待的任务)会继续执行下去,没有被执行的则中断。
而shutdownNow则是将线程池的状态设置为STOP,正在执行的任务则被停止,没被执行任务的则返回。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
使用不同的队列可以实现不一样的任务存取策略。在这里,我们可以再介绍下阻塞队列的成员:
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
ThreadPoolExecutor自带了4种饱和策略:
前面三个好理解一点,给第四个举个例子,上demo:
public class TestThreadPoolExecutor {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, // corePoolSize
5, // maximumPoolSize
1, // 过期时间
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1), // 阻塞队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 20; i++) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(500); // 让线程处理的久一点
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread());
}
});
}
threadPoolExecutor.shutdown();
}
}
12345678910111213141516171819202122232425262728
输出:
通过上面的例子可以看出,线程池任务满载的时候,一部分任务转由调用者执行(主线程)
CPU 密集型任务
计算密集型,顾名思义就是应用需要非常多的CPU计算资源,在多核CPU时代,我们要让每一个CPU核心都参与计算,将CPU的性能充分利用起来,这样才算是没有浪费服务器配置,如果在非常好的服务器配置上还运行着单线程程序那将是多么重大的浪费。
这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务
对于IO密集型的应用,就很好理解了,**我们现在做的开发大部分都是WEB应用,涉及到大量的网络传输,不仅如此,与数据库,与缓存间的交互也涉及到IO,一旦发生IO,线程就会处于等待状态,当IO结束,数据准备好后,线程才会继续执行。**因此从这里可以发现,对于IO密集型的应用,我们可以多设置一些线程池中线程的数量,这样就能让在等待IO的这段时间内,线程可以去做其它事,提高并发处理效率。
这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。