前面我们看到了pmq从端到端的调用,但是没有看到的还有很多细节的东西。比如我们看到在学习RocketMQ时,可以看到很多启动都是在broker中启动,那pmq中又是怎样启动的呢?本文重点来看pmq是如何启动服务,希望看完本文,你对于pmq的启动有一个清楚的认识。
启动mq-rest时,可以看到这样的日志:
我们首先注意的是它是在MqBoostrapListener中完成启动的。
而这正是基于观察者模式实现的,同时使用了类似SPI的技术,但是又不是SPI
//初始化
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isInit) {
try {
//启动定时任务、启动broker定时任务、注册时间、注册report
startTimer();
startBrokeTimer();
registerEvent();
reportService.registerReport();
isInit = true;
log.info("mq初始化成功!");
} catch (Exception e) {
log.error("mq初始化异常", e);
throw e;
}
}
}
在启动定时任务的时候,我们可以看到:
//启动定时任务
private void startTimer() {
Map<String, TimerService> startedServices = SpringUtil.getBeans(TimerService.class);
if (startedServices != null) {
startedServices.entrySet().forEach(t1 -> {
try {
t1.getValue().start();
log.info(t1.getKey() + "启动完成!");
} catch (Exception e) {
log.error(t1.getKey() + "启动异常!", e);
}
});
}
}
看到这里是不是有点奇怪,通常通过反射拿到bean都是一个,而这里却是多个,而且是startTimer接口实现类,从日志里面可以看到。那SpringUtils里面的getBeans又做了什么呢?
// 通过class获取Bean.
public static <T> Map<String, T> getBeans(Class<T> clazz) {
return getApplicationContext().getBeansOfType(clazz);
}
在实际工作中,我们经常会遇到一个接口及多个实现类的情况,并且在不同的条件下会使用不同的实现类。从使用方式上看,有些类似SPI的用法,但是由于SPI的使用并不是太方便,那么怎么办呢?我们可以借助ApplicationContext的getBeansOfType来实现我们需要的结果。而pmq就实现了。是不是恍然大悟了!
启动时,如果是pmq中的方法,可以看到日志中会带the guid is的标识,方便生产线上快速排查和定位问题。
也即在启动mq-rest的时候,就完成了相关服务的启动和初始化、注册。此时代表broker启动了。
按照上面的套路,我们来看到mq-ui里面完成了什么工作。
启动mq-ui,可以看到日志:
启动的时候,首先看到的MqStartProcessor,看命名可以猜出是基于spring的后置或者前置处理器实现:
//重写postProcessBeanFactory,这里做了mq客户端启动的初始化
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
if (environment != null) {
if (initFlag.compareAndSet(false, true)) {
logger.info("消息客户端开始初始化!");
MqClient.setSubscriberResolver(new SubscriberResolver());
MqClientStartup.init(environment);
//MqClientStartup.start();
// statService.start();
logger.info("消息客户端初始化完成!");
}
}
}
这里是基于后置处理器实现,实现扩展spring配置的方式之一。
同时里面也基于spring的applicationContext.getBeansOfType()实现:
同时可以看到还有度量监控信息服务的初始化:
完成初始化工作后,启动服务成功之后,mq-ui的日志还有一些信息:
那这个信息又是在做什么呢?
从实现的信息中可以看到都是获取它们的master。这里主要是看master是否发生变更,锁发生变更,如果发生变更,则进行邮件发送通知。
// @Transactional(rollbackFor = Exception.class)
public boolean isMaster() {
try {
if (!isLoad()) {
return false;
}
init();
boolean temp = checkMaster();
if (temp != isMaster) {
isMaster = temp;
if (temp) {
log.info("ip_{}_key_{} 获取到master!", ip, key);
EmailUtil emailUtil = getEmail();
if (emailUtil != null) {
emailUtil.sendWarnMail("锁发生变更" + key, String.format("ip_%s_key_%s 获取到master!", ip, key));
}
} else {
log.info("ip_{}_key_{} 失去master!", ip, key);
EmailUtil emailUtil = getEmail();
if (emailUtil != null) {
emailUtil.sendWarnMail("锁发生变更" + key, String.format("ip_%s_key_%s 失去master!", ip, key));
}
}
}
isMaster = temp;
return isMaster;
} catch (Exception e) {
return false;
}
}
从pmq启动的日志,我们可以清晰的看到启动的脉络,是不是pmq的学习还是有乐趣的。