前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pmq学习五-pmq启动学习

pmq学习五-pmq启动学习

作者头像
路行的亚洲
发布2021-01-18 10:14:19
5160
发布2021-01-18 10:14:19
举报
文章被收录于专栏:后端技术学习

前面我们看到了pmq从端到端的调用,但是没有看到的还有很多细节的东西。比如我们看到在学习RocketMQ时,可以看到很多启动都是在broker中启动,那pmq中又是怎样启动的呢?本文重点来看pmq是如何启动服务,希望看完本文,你对于pmq的启动有一个清楚的认识。

启动mq-rest时,可以看到这样的日志:

我们首先注意的是它是在MqBoostrapListener中完成启动的。

而这正是基于观察者模式实现的,同时使用了类似SPI的技术,但是又不是SPI

代码语言:javascript
复制
//初始化
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
   if (!isInit) {
      try {
         //启动定时任务、启动broker定时任务、注册时间、注册report
         startTimer();
         startBrokeTimer();
         registerEvent();
         reportService.registerReport();
         isInit = true;
         log.info("mq初始化成功!");
      } catch (Exception e) {
         log.error("mq初始化异常", e);
         throw e;
      }
   }

}

在启动定时任务的时候,我们可以看到:

代码语言:javascript
复制
//启动定时任务
private void startTimer() {
   Map<String, TimerService> startedServices = SpringUtil.getBeans(TimerService.class);
   if (startedServices != null) {
      startedServices.entrySet().forEach(t1 -> {
         try {
            t1.getValue().start();
            log.info(t1.getKey() + "启动完成!");
         } catch (Exception e) {
            log.error(t1.getKey() + "启动异常!", e);
         }
      });
   }
}

看到这里是不是有点奇怪,通常通过反射拿到bean都是一个,而这里却是多个,而且是startTimer接口实现类,从日志里面可以看到。那SpringUtils里面的getBeans又做了什么呢?

代码语言:javascript
复制
// 通过class获取Bean.
public static <T> Map<String, T> getBeans(Class<T> clazz) {
         return getApplicationContext().getBeansOfType(clazz);
}

在实际工作中,我们经常会遇到一个接口及多个实现类的情况,并且在不同的条件下会使用不同的实现类。从使用方式上看,有些类似SPI的用法,但是由于SPI的使用并不是太方便,那么怎么办呢?我们可以借助ApplicationContext的getBeansOfType来实现我们需要的结果。而pmq就实现了。是不是恍然大悟了!

启动时,如果是pmq中的方法,可以看到日志中会带the guid is的标识,方便生产线上快速排查和定位问题。

也即在启动mq-rest的时候,就完成了相关服务的启动和初始化、注册。此时代表broker启动了。

按照上面的套路,我们来看到mq-ui里面完成了什么工作。

启动mq-ui,可以看到日志:

启动的时候,首先看到的MqStartProcessor,看命名可以猜出是基于spring的后置或者前置处理器实现:

代码语言:javascript
复制
//重写postProcessBeanFactory,这里做了mq客户端启动的初始化
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
   if (environment != null) { 
      if (initFlag.compareAndSet(false, true)) { 
         logger.info("消息客户端开始初始化!");
         MqClient.setSubscriberResolver(new SubscriberResolver());
         MqClientStartup.init(environment);
         //MqClientStartup.start();
         // statService.start();
         logger.info("消息客户端初始化完成!");
      }
   }
}

这里是基于后置处理器实现,实现扩展spring配置的方式之一。

同时里面也基于spring的applicationContext.getBeansOfType()实现:

同时可以看到还有度量监控信息服务的初始化:

完成初始化工作后,启动服务成功之后,mq-ui的日志还有一些信息:

那这个信息又是在做什么呢?

从实现的信息中可以看到都是获取它们的master。这里主要是看master是否发生变更,锁发生变更,如果发生变更,则进行邮件发送通知。

代码语言:javascript
复制
// @Transactional(rollbackFor = Exception.class)
public boolean isMaster() {
    try {
        if (!isLoad()) {
            return false;
        }
        init();
        boolean temp = checkMaster();
        if (temp != isMaster) {
            isMaster = temp;
            if (temp) {
                log.info("ip_{}_key_{} 获取到master!", ip, key);
                EmailUtil emailUtil = getEmail();
                if (emailUtil != null) {
                    emailUtil.sendWarnMail("锁发生变更" + key, String.format("ip_%s_key_%s 获取到master!", ip, key));
                }
            } else {
                log.info("ip_{}_key_{} 失去master!", ip, key);
                EmailUtil emailUtil = getEmail();
                if (emailUtil != null) {
                    emailUtil.sendWarnMail("锁发生变更" + key, String.format("ip_%s_key_%s 失去master!", ip, key));
                }
            }
        }
        isMaster = temp;
        return isMaster;
    } catch (Exception e) {
        return false;
    }
}

从pmq启动的日志,我们可以清晰的看到启动的脉络,是不是pmq的学习还是有乐趣的。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档