1)如果要实现一个动态线程池,首先需要考虑的是将线程池的相关配置信息外置。这样出现问题的时候,能够基于配置修改,实现热部署。修改配置后,就能生效。因此,可以考虑的配置方式有多种:nacos、apollo、zookeeper、consul、etcd等。
2)如果线程池出现问题或者完成修改后,能够基于监控的信息,进行通知和告警。这样就需要考虑通知和告警的方式的多样性:比如基于钉钉、微信、飞书、电子邮件等渠道进行通知和告警。
根据引入的dynamic-tp-spring-cloud-starter-nacos或者dynamic-tp-spring-boot-starter-nacos依赖
以nacos为例:
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
<version>1.0.9</version>
</dependency>
可以看到自动装配的文件:DtpAutoConfiguration与NacosRefresher.
在DtpAutoConfiguration中,我们可以看到导入的配置信息:
@ImportAutoConfiguration({BaseBeanAutoConfiguration.class})
基于这个注解,关注BaseBeanAutoConfiguration这个类:
这个类主要干了下面这几件事请
DtpProperties 线程池相关配置
上下文holder dtpApplicationContextHolder
dtpBanner打印 dtpBannerPrinter
dtp后置处理器 dtpPostProcessor
dtp注册 dtpRegistry
dtp监控 dtpMonitor
dtpEndpoint dtpEndpoint
其中
1)dtpBanner是做控制台启动项目时的banner打印操作。
2)dtpPostProcessor dtp后置处理器 处理所有相关bean
如果bean是执行器,则注册dtp,此时会注册到DTP_REGISTRY 中, 数据结构:Map
否则会基于ApplicationHolder拿到基于DynamicTp注解的class,如果当前基于DynamicTp的methodMetadata 为空,则返回bean,否则拿到dtpAnnotationVal。poolName也即dtpAnnotationVal。
如果当前的bean属于线程池任务执行器,则注册task执行器。包装执行器,放入通知信息notifyItems。registerCommon 执行注册。
3)dtpRegistry 注册dtp DTP_REGISTRY 数据结构:Map
其中最为重要的方法是刷新方法:
获取dtp执行器,对执行器进行转换为DtpMainProp。执行刷新。
4)dtp监控 dtpMonitor会执行监控发布:里面有有2个方法需要注意:
检查监控 checkAlarm
collect 收集监控指标信息
其中:检查监控的时候,会基于当前的发送告警的信息:基于对应的渠道进行消息发送。
此时会发布两个事件:publishAlarmCheckEvent、publishCollectEvent
NacosRefresher中存在的方法Refresh: 刷新当监听到配置发生改变的时候,doNoticeAsync 执行异步通知,通知业务方,此时发生了配置的变更。
刷新完成后,执行RefreshEvent刷新事件发布。
发布完成后,可以看到对应的监听是在
com.dtp.starter.adapter.common.autoconfigure.AdapterCommonAutoConfiguration
适配器公共自动装配中
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
try {
if (event instanceof RefreshEvent) {
doRefresh(((RefreshEvent) event).getDtpProperties());
} else if (event instanceof CollectEvent) {
doCollect(((CollectEvent) event).getDtpProperties());
} else if (event instanceof AlarmCheckEvent) {
doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
}
} catch (Exception e) {
log.error("DynamicTp adapter, event handle failed.", e);
}
}
可以看到我们关心的三个发布事件,都在此进行了监听:
doRefresh、doCollect、doAlarmCheck
其中
刷新事件会执行相关渠道的通知
收集日志会执行对应的打印
告警信息会执行告警
使用方式:以nacos为例,可以看到其基于@EnableDynamicTp实现对dtp相关bean的注册。DtpBeanDefinitionRegistrar即是完成注册的类。其主要是创建dtp配置对象DtpProperties,绑定dtp配置,获取执行器。拿到执行器后,遍历执行,绑定对应的信息,构建构造函数,注册bean信息。方便后续对线程池的操作。
@Resource
private ThreadPoolExecutor dtpExecutor1;
@GetMapping("/dtp-nacos-example/test")
public String test() throws InterruptedException {
task();
return "success";
}
//获取dtp执行器
public void task() throws InterruptedException {
//获取dtp执行器
DtpExecutor dtpExecutor2 = DtpRegistry.getDtpExecutor("dtpExecutor2");
for (int i = 0; i < 100; i++) {
Thread.sleep(100);
dtpExecutor1.execute(() -> {
log.info("i am dynamic-tp-test-1 task");
});
dtpExecutor2.execute(NamedRunnable.of(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("i am dynamic-tp-test-2 task");
}, "task-" + i));
}
}
由此可以看到实现了两个最为主要的功能:对线程池进行动态变更和对线程池的监控告警。使用了观察者模式、适配器模式。