专栏首页后端技术学习pmq学习五-pmq启动学习

pmq学习五-pmq启动学习

前面我们看到了pmq从端到端的调用,但是没有看到的还有很多细节的东西。比如我们看到在学习RocketMQ时,可以看到很多启动都是在broker中启动,那pmq中又是怎样启动的呢?本文重点来看pmq是如何启动服务,希望看完本文,你对于pmq的启动有一个清楚的认识。

启动mq-rest时,可以看到这样的日志:

我们首先注意的是它是在MqBoostrapListener中完成启动的。

而这正是基于观察者模式实现的,同时使用了类似SPI的技术,但是又不是SPI

//初始化
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
   if (!isInit) {
      try {
         //启动定时任务、启动broker定时任务、注册时间、注册report
         startTimer();
         startBrokeTimer();
         registerEvent();
         reportService.registerReport();
         isInit = true;
         log.info("mq初始化成功!");
      } catch (Exception e) {
         log.error("mq初始化异常", e);
         throw e;
      }
   }

}

在启动定时任务的时候,我们可以看到:

//启动定时任务
private void startTimer() {
   Map<String, TimerService> startedServices = SpringUtil.getBeans(TimerService.class);
   if (startedServices != null) {
      startedServices.entrySet().forEach(t1 -> {
         try {
            t1.getValue().start();
            log.info(t1.getKey() + "启动完成!");
         } catch (Exception e) {
            log.error(t1.getKey() + "启动异常!", e);
         }
      });
   }
}

看到这里是不是有点奇怪,通常通过反射拿到bean都是一个,而这里却是多个,而且是startTimer接口实现类,从日志里面可以看到。那SpringUtils里面的getBeans又做了什么呢?

// 通过class获取Bean.
public static <T> Map<String, T> getBeans(Class<T> clazz) {
         return getApplicationContext().getBeansOfType(clazz);
}

在实际工作中,我们经常会遇到一个接口及多个实现类的情况,并且在不同的条件下会使用不同的实现类。从使用方式上看,有些类似SPI的用法,但是由于SPI的使用并不是太方便,那么怎么办呢?我们可以借助ApplicationContext的getBeansOfType来实现我们需要的结果。而pmq就实现了。是不是恍然大悟了!

启动时,如果是pmq中的方法,可以看到日志中会带the guid is的标识,方便生产线上快速排查和定位问题。

也即在启动mq-rest的时候,就完成了相关服务的启动和初始化、注册。此时代表broker启动了。

按照上面的套路,我们来看到mq-ui里面完成了什么工作。

启动mq-ui,可以看到日志:

启动的时候,首先看到的MqStartProcessor,看命名可以猜出是基于spring的后置或者前置处理器实现:

//重写postProcessBeanFactory,这里做了mq客户端启动的初始化
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
   if (environment != null) { 
      if (initFlag.compareAndSet(false, true)) { 
         logger.info("消息客户端开始初始化!");
         MqClient.setSubscriberResolver(new SubscriberResolver());
         MqClientStartup.init(environment);
         //MqClientStartup.start();
         // statService.start();
         logger.info("消息客户端初始化完成!");
      }
   }
}

这里是基于后置处理器实现,实现扩展spring配置的方式之一。

同时里面也基于spring的applicationContext.getBeansOfType()实现:

同时可以看到还有度量监控信息服务的初始化:

完成初始化工作后,启动服务成功之后,mq-ui的日志还有一些信息:

那这个信息又是在做什么呢?

从实现的信息中可以看到都是获取它们的master。这里主要是看master是否发生变更,锁发生变更,如果发生变更,则进行邮件发送通知。

// @Transactional(rollbackFor = Exception.class)
public boolean isMaster() {
    try {
        if (!isLoad()) {
            return false;
        }
        init();
        boolean temp = checkMaster();
        if (temp != isMaster) {
            isMaster = temp;
            if (temp) {
                log.info("ip_{}_key_{} 获取到master!", ip, key);
                EmailUtil emailUtil = getEmail();
                if (emailUtil != null) {
                    emailUtil.sendWarnMail("锁发生变更" + key, String.format("ip_%s_key_%s 获取到master!", ip, key));
                }
            } else {
                log.info("ip_{}_key_{} 失去master!", ip, key);
                EmailUtil emailUtil = getEmail();
                if (emailUtil != null) {
                    emailUtil.sendWarnMail("锁发生变更" + key, String.format("ip_%s_key_%s 失去master!", ip, key));
                }
            }
        }
        isMaster = temp;
        return isMaster;
    } catch (Exception e) {
        return false;
    }
}

从pmq启动的日志,我们可以清晰的看到启动的脉络,是不是pmq的学习还是有乐趣的。

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-01-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 对前端传入的json对象解析成多个对象

    multiRequestBodyDemo(@MultiRequestBody("dog")

    路行的亚洲
  • ReentranLock源码学习

    首先回答一个问题?线程的三大特性?什么时候我们需要锁?java中已经提供了synchronized,为什么还要使用ReentrantLock?AQS原理。

    路行的亚洲
  • ConcurrentHashMap源码学习

    既然有了HashMap为什么还会出现ConcurrentHashMap?同时ConcurrentHashMap具有什么优势?ConcurrentHashMap与...

    路行的亚洲
  • linux的那些事

    每天在哔哩哔哩看黑马程序员的python教程20课时 ###2.每天必须记写笔记,并且在博客上发布

    blankmiss
  • JS魔法堂:从void 0 === undefined说起

    一、前言                                       当使用coffeescript书写如下代码时 name = person?...

    ^_^肥仔John
  • JDK1.8源码(十一)——java.util.TreeMap类

      在前面几篇博客分别介绍了这样几种集合,基于数组实现的ArrayList 类,基于链表实现的LinkedList 类,基于散列表实现的HashMap 类,本篇...

    IT可乐
  • 013.Zabbix的Items(监控项)

    Items是从主机里面获取的所有数据,可以配置获取监控数据的方式、取值的数据类型、获取数值的间隔、历史数据保存时间、趋势数据保存时间、监控key的分组等。

    木二
  • 如何写一个工业级的MySQL分布式锁组件?

    在分布式系统中,分布锁是一个最基础的工具类。例如,部署了2个有付款功能的微服务中,用户有可能对一个订单发起2次付款操作,而这2次请求可能被发到2个服务中,所以必...

    Java识堂
  • SpringBoot @Value中文乱码解决

    在使用spring boot开发的时候,有时候我们需要在配置文件application.properties文件中添加中文信息。在代码中使用@value获取.但...

    凯哥Java
  • 【MATLAB 从零到进阶】day11 描述性统计

    均值mean 方差var和标准差std 最值max/min 极差range 中位数median 分位数quantile/prctile 众数mode...

    统计学家

扫码关注云+社区

领取腾讯云代金券