1. 什么是线程池?
线程池: 简单理解,它就是一个管理线程的池子。
之前我们有一个和第三方对接的需求,需要向第三方推送数据,引入了多线程来提升数据推送的效率,其中用到了线程池来管理线程。
主要代码如下:
完整可运行代码地址:https://gitee.com/fighter3/thread-demo.git
线程池的参数如下:
同时还用了synchronized 来加锁,保证数据不会被重复推送:
synchronized (PushProcessServiceImpl.class) {}
ps:这个例子只是简单地进行了数据推送,实际上还可以结合其他的业务,像什么数据清洗啊、数据统计啊,都可以套用。
用一个通俗的比喻:
有一个营业厅,总共有六个窗口,现在开放了三个窗口,现在有三个窗口坐着三个营业员小姐姐在营业。
老三去办业务,可能会遇到什么情况呢?
上面的这个流程几乎就跟 JDK 线程池的大致流程类似,
所以我们线程池的工作流程也比较好理解了:
此值是用来初始化线程池中核心线程数,当线程池中线程池数< corePoolSize
时,系统默认是添加一个任务才创建一个线程池。当线程数 = corePoolSize时,新任务会追加到workQueue中。
maximumPoolSize
表示允许的最大线程数 = (非核心线程数+核心线程数),当BlockingQueue
也满了,但线程池中总线程数 < maximumPoolSize
时候就会再次创建新的线程。
非核心线程 =(maximumPoolSize - corePoolSize ) ,非核心线程闲置下来不干活最多存活时间。
线程池中非核心线程保持存活的时间的单位
线程池等待队列,维护着等待执行的Runnable
对象。当运行当线程数= corePoolSize时,新的任务会被添加到workQueue
中,如果workQueue
也满了则尝试用非核心线程执行任务,等待队列应该尽量用有界的。
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等。
corePoolSize
、workQueue
、maximumPoolSize
都不可用的时候执行的饱和策略。
类比前面的例子,无法办理业务时的处理方式,帮助记忆:
想实现自己的拒绝策略,实现RejectedExecutionHandler接口即可。
常用的阻塞队列主要有以下几种:
threadsPool.execute(new Runnable() {
@Override public void run() {
// TODO Auto-generated method stub }
});
Future<Object> future = executor.submit(harReturnValuetask);
try { Object s = future.get(); } catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池 executor.shutdown();
}
可以通过调用线程池的shutdown
或shutdownNow
方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。
shutdown 和shutdownnow简单来说区别如下:
shutdownNow()能立即停止线程池,正在跑的和正在等待的任务都停下了。这样做立即生效,但是风险也比较大。shutdown()只是关闭了提交通道,用submit()是无效的;而内部的任务该怎么跑还是怎么跑,跑完再彻底停止线程池。
线程在Java中属于稀缺资源,线程池不是越大越好也不是越小越好。任务分为计算密集型、IO密集型、混合型。
Runtime.getRuntime().availableProcessors();
当然,实际应用中没有固定的公式,需要结合测试和监控来进行调整。
主要有四种,都是通过工具类Excutors创建出来的,阿里巴巴《Java开发手册》里禁止使用这种方式来创建线程池。
前三种线程池的构造直接调用ThreadPoolExecutor的构造方法。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
线程池特点
工作流程:
适用场景
适用于串行执行任务的场景,一个任务一个任务地执行。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
线程池特点:
工作流程:
使用场景
FixedThreadPool 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
线程池特点:
当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。
工作流程:
适用场景
用于并发执行大量短期的小任务。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
线程池特点
工作机制
使用场景
周期性执行任务的场景,需要限制线程数量的场景
例如newFixedThreadPool使用了无界的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长,会导致队列的任务越积越多,导致机器内存使用不停飙升,最终导致OOM。
在使用线程池处理任务的时候,任务代码可能抛出RuntimeException,抛出异常后,线程池可能捕获它,也可能创建一个新的线程来代替异常的线程,我们可能无法感知任务出现了异常,因此我们需要考虑线程池异常情况。
常见的异常处理方式:
线程池有这几个状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
//线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池各个状态切换图:
RUNNING
SHUTDOWN
STOP
TIDYING
TERMINATED
线程池提供了几个 setter方法来设置线程池的参数。
这里主要有两个思路:
线程池配置没有固定的公式,通常事前会对线程池进行一定评估,常见的评估方案如下:
上线之前也要进行充分的测试,上线之后要建立完善的线程池监控机制。
事中结合监控告警机制,分析线程池的问题,或者可优化点,结合线程池动态参数配置机制来调整配置。
事后要注意仔细观察,随时调整。
具体的调优案例可以查看参考[7]美团技术博客。
⭐这道题在阿里的面试中出现频率比较高
线程池实现原理可以查看 要是以前有人这么讲线程池,我早就该明白了! ,当然,我们自己实现, 只需要抓住线程池的核心流程-参考[6]:
我们自己的实现就是完成这个核心流程:
实现代码[6]:
public class MyThreadPoolExecutor implements Executor {
//记录线程池中线程数量
private final AtomicInteger ctl = new AtomicInteger(0);
//核心线程数
private volatile int corePoolSize;
//最大线程数
private volatile int maximumPoolSize;
//阻塞队列
private final BlockingQueue<Runnable> workQueue;
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
}
/**
* 执行
*
* @param command
*/
@Override
public void execute(Runnable command) {
//工作线程数
int c = ctl.get();
//小于核心线程数
if (c < corePoolSize) {
//添加任务失败
if (!addWorker(command)) {
//执行拒绝策略
reject();
}
return;
}
//任务队列添加任务
if (!workQueue.offer(command)) {
//任务队列满,尝试启动线程添加任务
if (!addWorker(command)) {
reject();
}
}
}
/**
* 饱和拒绝
*/
private void reject() {
//直接抛出异常
throw new RuntimeException("Can not execute!ctl.count:"
+ ctl.get() + "workQueue size:" + workQueue.size());
}
/**
* 添加任务
*
* @param firstTask
* @return
*/
private boolean addWorker(Runnable firstTask) {
if (ctl.get() >= maximumPoolSize) return false;
Worker worker = new Worker(firstTask);
//启动线程
worker.thread.start();
ctl.incrementAndGet();
return true;
}
/**
* 线程池工作线程包装类
*/
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
public Worker(Runnable firstTask) {
this.thread = new Thread(this);
this.firstTask = firstTask;
}
@Override
public void run() {
Runnable task = firstTask;
try {
//执行任务
while (task != null || (task = getTask()) != null) {
task.run();
//线程池已满,跳出循环
if (ctl.get() > maximumPoolSize) {
break;
}
task = null;
}
} finally {
//工作线程数增加
ctl.decrementAndGet();
}
}
/**
* 从队列中获取任务
*
* @return
*/
private Runnable getTask() {
for (; ; ) {
try {
System.out.println("workQueue size:" + workQueue.size());
return workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//测试
public static void main(String[] args) {
MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(2, 2,
new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 10; i++) {
int taskNum = i;
myThreadPoolExecutor.execute(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务编号:" + taskNum);
});
}
}
}
这样,一个实现了线程池主要流程的类就完成了。
我们可以对正在处理和阻塞队列的任务做事务管理或者对阻塞队列中的任务持久化处理,并且当断电或者系统崩溃,操作无法继续下去的时候,可以通过回溯日志的方式来撤销正在处理
的已经执行成功的操作。然后重新执行整个阻塞队列。
也就是:阻塞队列持久化;正在处理任务事务控制;断电之后正在处理任务的回滚,通过日志恢复该次操作;服务器重启后阻塞队列中的数据再加载。
参考:
[1]. 《Java并发编程的艺术》
[2]. 《Java发编程实战》
[3]. 讲真 这次绝对让你轻松学习线程池
[4]. 面试必备:Java线程池解析
[5]. 面试官问:“在项目中用过多线程吗?”你就把这个案例讲给他听!
[6]. 小傅哥 《Java面经手册》