线程池是一种线程复用的技术,它可以有效地控制线程的数量,处理过程中将任务添加到队列,然后在线程创建后启动这些任务。主要作用有:
主要涉及到如下原理:
JDK 中提供的 ThreadPoolExecutor 实现了线程池,可以灵活地设置各项参数。usage 如下:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, //核心线程数
5, //最大线程数
60L, //空闲线程存活时间
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3), //任务队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() //拒绝策略
);
executor.execute(() -> {
//任务逻辑
});
使用线程池可以有效地管理大量短生命周期的线程,节省频繁创建和销毁线程的开销。
JDK 提供的线程池功能较基础,开源框架 Executors 在此基础上做了许多封装,更加易用。主要有:
Usage 如下:
//创建一个可缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
//创建一个定长线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
//创建一个周期性执行的线程池
ExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
//创建一个单线程化的线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
这些线程池都实现了ExecutorService接口,使用方法与JDK ThreadPollExecutor类似,可以使用submit()/execute()提交任务,使用shutdown()关闭线程池。
在SpringBoot中,可以通过下面配置自定义线程池:
spring:
task:
execution:
pool:
core-size: 10 #核心线程数
max-size: 20 #最大线程数
queue-capacity: 200 #队列容量
thread-name-prefix: task- #线程名前缀
然后在程序中可以通过注入TaskExecutor使用线程池:
@Autowired
private TaskExecutor taskExecutor;
public void doSomething(){
taskExecutor.execute(() -> {
//任务逻辑
});
}
SpringBoot使用的线程池实现也是JDK中的ThreadPoolExecutor,只是进行了封装和配置化。我们可以按需灵活配置线程池各参数,满足不同的业务需求。
线程池的配置参数较多,如何选取合理的配置是一个值得探讨的问题。主要的配置参数有:
这里给出一些合理配置的思路:
除此之外,对于任务执行时间较长的场景,不建议使用过大的线程池,否则可能导致线程积压,反而带来效率损失。线程池的配置还是需要根据具体业务场景进行针对性调优。
JDK 线程池的拒绝策略只有 AbortPolicy(默认)、CallerRunsPolicy 和 DiscardPolicy 三种。还有其他的拒绝策略,可以根据需要进行选择:
一般来说,对于非关键任务可以选择DiscardPolicy、DiscardOldestPolicy等丢任务策略;对于关键任务则需要选择CallerRunsPolicy、RejectExecutionHandler等不会丢失任务的策略。AbortPolicy只适用于适当配置线程池,不会出现拒绝的情况下。
为了维护线程池的稳定性,我们需要对其进行监控。主要监控内容有:
除了监控之外,我们还需要对线程池进行定期维护,主要方面有:
综上,维护一个高性能且稳定的线程池还是需要全面考虑的。只有同时兼顾到线程池的监控、参数调优和日常维护,才能保证其长期高效运行。
使用线程池中一些常见的问题,我们需要在开发和运维中重点关注。
除此之外,也要考虑到线程池在高并发场景下的稳定性问题,需要对其进行压力测试,观察触发OOME等异常的并发阈值,并作出相应优化。总之,webp应用中使用线程池还是需要全面考虑,这也是一个值得深入研究的内容。
我们来看一个简单的线程池实现。它包含:
public class SimpleThreadPool {
//任务队列
private BlockingQueue<Runnable> taskQueue;
//线程列表
private List<Worker> workers = new ArrayList<>();
//线程池监控
private Monitor monitor;
public SimpleThreadPool(int coreSize, int maxSize, BlockingQueue<Runnable> queue) {
taskQueue = queue;
monitor = new Monitor();
for (int i = 0; i < coreSize; i++) {
Worker worker = new Worker();
workers.add(worker);
worker.start();
}
}
//提交任务
public void execute(Runnable task) {
if (task != null) {
taskQueue.add(task);
monitor.incrementSubmitCount();
}
}
//关闭线程池
public void shutdown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
class Worker extends Thread {
@Override
public void run() {
while (!isInterrupted()) {
Runnable task = null;
try {
task = taskQueue.take();
task.run();
monitor.incrementCompleteCount();
} catch (InterruptedException e) {
break;
}
}
}
public void shutdown() {
interrupt();
}
}
}
这个简单的线程池实现了基本的提交任务、关闭线程池和监控线程池运行状态的功能。虽然还不及JDK线程池功能完备,但已包含了线程池的基本构成与运作过程,希望这可以帮助大家进一步理解线程池的原理。
如果要将这个简单线程池应用在实际项目中,还需要考虑到线程池容量动态调整、异常处理、清理工作线程等更丰富的功能,不过 starters 可以从简单的实现开始,逐步丰富与增强。
在实际工程中,我们需要提供一个可运维的线程池方案,方便进行监控、参数调优和问题排查。这里给出一个方案思路:
以上方案还需要具体实现与配套的运维系统才能发挥最大效果。但提供一个开放、可监控、可运维的线程池框架已是实际工程中比较理想的状态。这至少可以大大减轻工程师在使用线程池及其优化时的难度,提高系统的稳定性。
理解了线程池的原理与实现后,我们还需要结合实际场景去灵活运用它。这里分析几个较典型的使用场景和案例:
案例1:网站高并发场景下的使用。可以使用线程池预先创建一定数量的工作线程,将用户请求作为任务提交给线程池执行,以此来达到异步处理用户请求的效果,从而提高系统吞吐量。
案例2:批量数据处理场景下的使用。可以使用线程池实现异步批量数据处理,主线程将数据列表拆分为多个批次,提交给线程池执行,而自己则继续前行其它操作。这可以最大限度地利用CPU资源,不会因为批量数据处理而长时间阻塞主线程。
案例3:定时任务的使用。我们可以 periodically 提交定时任务给线程池,由其中一个工作线程负责执行,然后等待下次调度。通过这种方式,实现定时任务的线程池实现,避免单个定时任务执行时间过长影响其他定时任务。
Netty 是一个异步事件驱动的网络应用框架,其线程模型和线程池的实现值得我们借鉴和学习。
Netty 的线程模型主要包含以下几个部分:
其基本工作流程是:
Boss线程接收连接请求并注册到Selector,Selector监听IO事件并将事件通知给Worker线程,Worker线程即刻处理IO事件,对连接进行业务处理。
在这个线程模型下,Netty 为不同的事件和任务划分出不同的线程来处理,避免单一类型任务对其它任务产生影响。并通过线程池的手段实现线程复用,合理控制系统资源。
Netty 中的线程池主要分为两种:
NioEventLoopGroup 通常作为 BossGroup 和 WorkerGroup 使用,来处理 I/O 事件和网络读写。DefaultEventExecutorGroup 常作为 Server 的后台业务线程池使用。
总之,Netty 通过线程模型和不同类型的线程池配合,实现了事件驱动、任务分类和线程复用,这为其能够高效并且可靠地运行于高并发环境奠定了基础。这也是我们在设计一个高性能服务器程序时值得学习的地方。
Netty 的线程模型和线程池的设计思想,对其他网络应用框架同样具有很好的借鉴意义。理解它们的工作机制有助于我们开发出一个高效、稳定的网络服务程序。
在使用线程池过程中,我们常需要判断其是否已经达到最大容量,以决定是否继续提交新任务。这里介绍几种判断线程池是否已满的方法:
if (executor.getActiveCount() == executor.getMaximumPoolSize()) {
// 线程池已满
} else {
// 线程池未满,可以提交新任务
}
try {
executor.submit(callableTask);
} catch (RejectedExecutionException e) {
// 线程池已满
}
executor.submit(new Runnable() {
public void run() {
if (executor.getPoolSize() == executor.getMaximumPoolSize()) {
// 线程池已满
} else {
// 运行任务逻辑
}
}
});
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isTerminated() || executor.isShutdown()) {
// 关闭状态,不再接收新任务
} else {
// 线程池已满,执行拒绝策略
}
}
});
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(0));
try {
executor.execute(task);
} catch (RejectedExecutionException e) {
// 线程池已满
}
以上就是几种判断线程池是否已满的方法,可以根据实际场景选择一种或多种组合使用。理解线程池的运行机制有助于我们作出正确判断,选择最佳的判断方式。
合理地配置线程池的大小是使用线程池的一个关键点。如果线程池过小,无法满足任务需求,导致任务积压和系统吞吐量下降;如果线程池过大,会造成资源浪费和系统稳定性降低。那么如何计算一个合适的线程池大小呢?
这里介绍几点规划线程池容量的思考:
一般来说,
线程池coreSize可 initially 设置为:CPU 核数 + 1。
maximumSize 可设置为:coreSize * 2 或 coreSize * 3。
阻塞队列大小设置为:maximumSize / 2 或 maximumSize。
在此基础上,需要根据系统运行情况动态监控,进行参数调优:
综上,合理计算线程池大小是一个递归的过程,需要不断根据任务特性、系统资源以及监控信息进行评估和优化。但初始设置和动态调优又不能太极端,需要选取一个平衡值,这也是使用线程池并发设计的精髓与难点所在。
在 muitl-threading 编程中,回调机制是比较常用的一种机制。它可以在一个线程中启动某个任务,然后在该任务完成后在线程中得到通知,然后进行后续的一些处理。
使用线程池实现异步回调的一般步骤如下:
public interface Callback {
void onComplete(Result result);
}
executor.submit(new Task(), new Callback() {
public void onComplete(Result result) {
// ... 处理结果
}
});
public class Task implements Runnable {
private Callback callback;
// ...
@Override
public void run() {
Result result = doSomething();
callback.onComplete(result);
}
}
这个模式的应用场景是:主线程需要启动一些耗时任务,但又不能被这些任务的执行时间绑死,通过异步回调可以在任务完成后得到通知并进行必要的后续操作。
如服务器程序启动一段数据加载任务,加载完成后通知服务器程序数据加载完毕,然后开启服务器监听端口。如果没有异步回调,服务器线程会一直等待数据加载完成,造成线程阻塞,延迟启动服务。
异步回调模式将任务的执行过程和回调方法解耦,使两者可以在不同的线程中执行,这适用于多线程环境中的异步任务通知场景。它也体现了线程池实现异步任务及其回调的便利性,这是thread pool 编程中比较实用的模式和机制。
将任务提交给线程池执行,然后在回调方法中得到任务执行结果,这是一种比较简洁高效的异步任务通信方式,相比传统join()和future方式更加灵活和解耦。这也是我们学习使用线程池的原因之一。
在Netty和Spring等框架中都广泛应用了这种异步回调的模型,理解线程池的基本原理和机制有助于我们灵活使用各种并发框架。
这里给出线程池应用的一个实例场景:批量数据操作。比如批量插入数据库或调用第三方接口等。
如果不使用线程池,我们的代码可能如下:
List<Data> dataList = ... // 待操作的数据列表
for (Data data : dataList) {
insertToDB(data); // 插入数据库操作
}
这种方式会有两个问题:
使用线程池后,代码可以改进为:
List<Data> dataList = ...
ExecutorService executor = Executors.newFixedThreadPool(10); // 创建10个工作线程的线程池
for (Data data : dataList) {
executor.submit(() -> insertToDB(data)); // 提交插入任务到线程池
}
executor.shutdown(); // 关闭线程池
这种方式使用线程池实现了数据批量插入的异步执行,有以下好处:
这就是使用线程池改进批量数据操作的示例,避免了同步操作的阻塞和失败扩散问题,提高了系统吞吐量和数据操作的稳定性。
类似的场景还有:
总之,任何需要批量定量操作的数据或任务,如果操作时间较长,都适合采用线程池实现异步批处理。这既可以避免操作对主线程的影响,也可以设置并发量控制批处理速度,使系统资源不被洪水般的批处理任务淹没。
这里给出线程池另一个应用实例场景:网站爬虫。网站爬虫需要爬取大量网页数据,如果不使用线程池,代码可能如下:
public void crawl() {
for (String url : urls) {
Document doc = httpGet(url);
parse(doc);
extractLinks(doc);
}
}
这种单线程爬虫有几个问题:
使用线程池可以改进为:
ExecutorService executor = Executors.newFixedThreadPool(50); // 创建50个工作线程的线程池
for (String url : urls) {
executor.submit(() -> {
Document doc = httpGet(url);
parse(doc);
extractLinks(doc);
});
}
executor.shutdown();
这种多线程爬虫有明显优势:
这就是使用线程池改进网站爬虫的示例,实现了高效率、可控制和可靠的多线程爬虫程序。相比单线程爬虫具有明显优势,这也是大部分工业级爬虫软件选用的方案。
类似的场景还有:
OK,到这里我们的线程池学习就告一段落了。回顾一下,我们学习了以下主要内容:
线程池关键应用实例:网站爬虫,批量数据操作,高并发服务器等应用案例分析。