专栏首页后端技术学习pmq学习三-mq客户端启动的流程

pmq学习三-mq客户端启动的流程

我们知道在RocketMQ中,服务端代表的是broker,而客户端才是我们的生产者和消费者。而pmq中,也是如此,服务端是broker,而客户端是生产者和消费者。客户端与spring集成,是从这里开始的,可以看到mq启动处理器实现了BeanFactoryPostProcessor,重写了postProcessBeanFactory后置处理器bean工厂。这里基本上涉及到IMqFactory上的接口。

客户端启动流程:

MqClient启动:

public class MqClientStartup {
   //spring初始化完成 ,重要
   public static void springInitComplete() {
      MqClient.start();
      monitorConfig();
   }

    //初始化
   public static void init(Environment env1) {
      if (initFlag.compareAndSet(false, true)) {
         env = env1;
         initConfig();
      }
   }

    //监控配置
      private static void monitorConfig() {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                while (isRunning) {
                    updateConfig();
                    Util.sleep(2000);
                }
            }

        });
    }
    //更新配置
    protected static void updateConfig() {
        if (properties == null) {
            properties = MqClient.getContext().getConfig().getProperties();
        }
        setRbTimes();
        setPbTimes();
        setAsynCapacity();
        setMetaMode();
        setPullDeltaTime();
        setPublishAsynTimeout();
    }

 }  

而这里可以看到启动MqClient.start():

public static boolean start() {
    //启动时,会注册消费者组
   if (startFlag.compareAndSet(false, true)) {
      registerConsumerGroup();
   }
   return false;
}

//可以看到只要启动时才会注册消费者组
public static boolean registerConsumerGroup(Map<String, ConsumerGroupVo> groups) {
        if (groups == null || groups.size() == 0) {
            return false;
        }
         //如果已经初始化,此时执行注册消费组
        if (hasInit()) {
            log.info("已经初始化完成!");
            return doRegisterConsumerGroup(groups);
        } else {
             //否者等待初始化完成,然后执行注册消费组 
            log.warn("系统为初始化,启动异步注册!");
            executor.execute(new Runnable() {
                public void run() {
                    while (!hasInit()) {
                        Util.sleep(2000);
                    }
                doRegisterConsumerGroup(groups);
            });
            return true;
        }
    }    

执行注册消费组

private static boolean doRegisterConsumerGroup(Map<String, ConsumerGroupVo> groups) {

    //变量消费者组
   for (ConsumerGroupVo consumerGroup : groups.values()) {
     //校验消费组  
      if (!checkVaild(consumerGroup)) {
         return false;
      }
       //如果拿到的消费者组版本包含消费者组拿到的元数据中的名称,则设置为false,同时提示已订阅
      if (mqContext.getConsumerGroupVersion().containsKey(consumerGroup.getMeta().getName())) {
         log.info("ConsumerGroup:" + consumerGroup.getMeta().getName() + " has  subscribed,已订阅!");
         return false;
      }
       //消费者组通过元数据拿到originName如果为空,则设置originName
      if (Util.isEmpty(consumerGroup.getMeta().getOriginName())) {
         consumerGroup.getMeta().setOriginName(consumerGroup.getMeta().getName());
      }
       //消费组拿到topic如果不为空,则消费组名称放入信息
      if (consumerGroup.getTopics() != null) {
         consumerGroupNames.put(consumerGroup.getMeta().getOriginName(),
               new ArrayList<>(consumerGroup.getTopics().keySet()));
      } else {
         consumerGroupNames.put(consumerGroup.getMeta().getOriginName(), new ArrayList<>());
      }
      groupNames += consumerGroup.getMeta().getName() + ",";
   }
   //事件触发,将注册设置为true
   register();
    //消费组注册请求,request设置信息包括:消费者名称、消费者id、客户端id、消费者名称、设置subEnv
   ConsumerGroupRegisterRequest request = new ConsumerGroupRegisterRequest();
   request.setConsumerGroupNames(consumerGroupNames);
   request.setConsumerId(mqContext.getConsumerId());
   request.setClientIp(mqContext.getConfig().getIp());
   request.setConsumerName(mqContext.getConsumerName());
   if (MqClient.getMqEnvironment() != null) {
      if (MqEnv.FAT == MqClient.getMqEnvironment().getEnv()) {
         request.setSubEnv(MqClient.getMqEnvironment().getSubEnv().toLowerCase());
      }
   }
   try {
      //消费者组注册响应,注册消费者,放入请求
      ConsumerGroupRegisterResponse consumerGroupRegisterResponse = mqContext.getMqResource()
            .registerConsumerGroup(request);
       //如果消费者组注册响应为success,则填充信息
      if (consumerGroupRegisterResponse.isSuc()) {
         Map<String, String> broadcastConsumerGroupNames = consumerGroupRegisterResponse
               .getConsumerGroupNameNew();
         for (ConsumerGroupVo consumerGroup : groups.values()) {
            if (broadcastConsumerGroupNames != null
                  && broadcastConsumerGroupNames.containsKey(consumerGroup.getMeta().getOriginName())) {
               consumerGroup.getMeta()
                     .setName(broadcastConsumerGroupNames.get(consumerGroup.getMeta().getOriginName()));
            }
            mqContext.getConfigConsumerGroup().put(consumerGroup.getMeta().getName(), consumerGroup);
            mqContext.getConsumerGroupVersion().put(consumerGroup.getMeta().getName(), 0L);
            //fire消费者组注册事件为true getRegisterConsumerGroupListeners()线程启动
             fireConsumerGroupRegisterEvent(consumerGroup);
         }
          //创建消费者polling服务,并启动consumerPollingService服务,mqCheckService创建,同时启动
         consumerPollingService = mqFactory.createConsumerPollingService();
         consumerPollingService.start();
         mqCheckService = mqFactory.createMqCheckService();
         mqCheckService.start();
         // MqCheckService.getInstance().start(mqContext);
         log.info(groupNames + "  subscribe_suc,订阅成功!and json is " + JsonUtil.toJson(request));
      } else {
         throw new RuntimeException("registerConsumerGroup_error, the req is" + JsonUtil.toJsonNull(request)
               + ",and resp is " + JsonUtil.toJson(consumerGroupRegisterResponse));
      }
   } catch (Exception e) {
      log.error("consumer_group_register_error", e);
      throw new RuntimeException(e);
   }
   return true;
}

执行consumerPollingService、mqCheckService启动

@Override
public void start() {
   if (startFlag.compareAndSet(false, true)) {
      isStop = false;
      runStatus = false;
      executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100),
            SoaThreadFactory.create("ConsumerPollingService", true),
            new ThreadPoolExecutor.DiscardOldestPolicy());
      executor.execute(new Runnable() {
         @Override
         public void run() {
            while (!isStop) {
                //将允许状态设置为true,同时将traceMessaageItem设置为true,执行longPolling操作
               TraceMessageItem traceMessageItem = new TraceMessageItem();
               runStatus = true;
               try {
                  traceMessageItem.status = "suc";
                  longPolling();
               } catch (Exception e) {
                  // e.printStackTrace();
                  traceMessageItem.status = "fail";
                  Util.sleep(1000);
               }
                //链路追踪添加链路信息,同时设置为false
               traceMsg.add(traceMessageItem);
               runStatus = false;
            }
         }
      });
   }
}

设置consumerPollingService状态为true,同时启动longPolling方法,也即此时会启动链路追踪

protected void longPolling() {
   if (mqContext.getConsumerId() > 0 && mqContext.getConsumerGroupVersion() != null
         && mqContext.getConsumerGroupVersion().size() > 0) {
       //设置事务
      Transaction transaction = Tracer.newTransaction("mq-group", "longPolling");
      try {
         //创建获取消费组请求对象,放入消费者id、消费者组版本 
         GetConsumerGroupRequest request = new GetConsumerGroupRequest();
         request.setConsumerId(mqContext.getConsumerId());
         request.setConsumerGroupVersion(mqContext.getConsumerGroupVersion());
          //获取消费者组响应
         GetConsumerGroupResponse response = mqResource.getConsumerGroup(request);
         if (response != null && response.getConsumerDeleted() == 1) {
            log.info("consumerid为" + request.getConsumerId());
         }
          //执行处理group
         handleGroup(response);
         transaction.setStatus(Transaction.SUCCESS);
      } catch (Exception e) {
         transaction.setStatus(e);
      } finally {
         transaction.complete();
      }
   } else {
      Util.sleep(1000);
   }
}

执行handlerGroup:

protected void handleGroup(GetConsumerGroupResponse response) {
   if (isStop) {
      return;
   }
    //响应如果不为空,则设置broker元数据模式
   if (response != null) {
      mqContext.setBrokerMetaMode(response.getBrokerMetaMode());
      if (MqClient.getMqEnvironment() != null && MqClient.getMqEnvironment().getEnv() == MqEnv.FAT) {
         MqClient.getContext().setAppSubEnvMap(response.getConsumerGroupSubEnvMap());
      }
   }
    //mq客户端重新启动
   if (response != null && response.getConsumerDeleted() == 1) {
      MqClient.reStart();
      Util.sleep(5000);
      return;
   } else if (response != null && response.getConsumerGroups() != null
         && response.getConsumerGroups().size() > 0) {
      log.info("get_consumer_group_data,获取到的最新消费者组数据为:" + JsonUtil.toJson(response));
      TraceMessageItem item = new TraceMessageItem();
      item.status = "changed";
      item.msg = JsonUtil.toJson(response);
       //响应拿到消费者组,进行遍历,如果没有stop,同时请求为新请求,请求放入,同时创建mq组线程服务
      response.getConsumerGroups().entrySet().forEach(t1 -> {
         if (!isStop) {
            if (!mqExcutors.containsKey(t1.getKey())) {
               mqExcutors.put(t1.getKey(), mqFactory.createMqGroupExcutorService());
            }
            log.info("consumer_group_data_change,消费者组" + t1.getKey() + "发生重平衡或者meta更新");
            // 进行重平衡操作或者更新元数据信息
            mqExcutors.get(t1.getKey()).rbOrUpdate(t1.getValue(), response.getServerIp());  
            mqContext.getConsumerGroupVersion().put(t1.getKey(), t1.getValue().getMeta().getVersion());
         }
      });
      traceMsg.add(item);
   }
   // 然后启动
   mqExcutors.values().forEach(t1 -> {
      t1.start();
   });
}

可以看到MqGroupExcutorService信息:

public MqGroupExcutorService(IMqResource mqResource) {
   //mq上下文、mqResource、mq工厂 
   this.mqContext = MqClient.getContext();
   this.mqResource = mqResource;
   this.mqFactory = MqClient.getMqFactory();
}

同时可以看到MqCheckService信息:

public MqCheckService(IMqResource mqResource) {
   this.mqContext = MqClient.getContext();
   this.mqResource = mqResource;
}

Mq启动后置处理器:这里是和Spring集成的方式:

@Component
public class MqStartProcessor implements BeanFactoryPostProcessor, PriorityOrdered, EnvironmentAware {
   @Override
   public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
      if (environment != null) { 
         if (initFlag.compareAndSet(false, true)) { 
            logger.info("消息客户端开始初始化!");
            MqClient.setSubscriberResolver(new SubscriberResolver());
             //初始化
            MqClientStartup.init(environment);
            //MqClientStartup.start();
            // statService.start();
            logger.info("消息客户端初始化完成!");
         }
      }
   }

}

mq客户端启动初始化:

public static void init(Environment env1) {
   if (initFlag.compareAndSet(false, true)) {
      env = env1;
      initConfig();
   }
}

//初始化配置
private static void initConfig() {
        MqConfig config = new MqConfig();
        String netCard = System.getProperty("mq.network.netCard", env.getProperty("mq.network.netCard", ""));       
        String url =System.getProperty("mq.broker.url", env.getProperty("mq.broker.url", ""));      
        String host = System.getProperty("mq.client.host", env.getProperty("mq.client.host", ""));
        String serverPort = System.getProperty("server.port", env.getProperty("server.port", "8080"));
        String asynCapacity = System.getProperty("mq.asyn.capacity", env.getProperty("mq.asyn.capacity", "2000"));
        String rbTimes = System.getProperty("mq.rb.times", env.getProperty("mq.rb.times", "4"));
        String pbRetryTimes = System.getProperty("mq.pb.retry.times", env.getProperty("mq.pb.retry.times", "10"));
        String readTimeOut = System.getProperty("mq.http.timeout", env.getProperty("mq.http.timeout", "10000"));
        String pullDeltaTime = System.getProperty("mq.pull.time.delta", env.getProperty("mq.pull.time.delta", "150"));
        boolean metaMode = "true"
                .equals(System.getProperty("mq.broker.metaMode", env.getProperty("mq.broker.metaMode", "true")));

         //mq客户端初始化,然后更新配置 
        MqClient.init(config);
        updateConfig();
    }

进行初始化:

public static void init(MqConfig config) {
   if (initFlag.compareAndSet(false, true)) {
      //执行初始化 
      doInit(config);
      //激活初始化事件  getInitCompleted 线程启动
      fireInitEvent();
      log.info("mq_client has  inited,初始化完成");
   }
}

//执行初始化
private static void doInit(MqConfig config) {
        //设置消费者名称
        mqContext.setConsumerName(
                ConsumerUtil.getConsumerId(config.getIp(), PropUtil.getProcessId() + "", config.getServerPort()));
         //如果mq上下文里面的mq信息为null,mqContext填充配置信息,同时异步消息为空,则创建msgAsyn队列。
        if (mqContext.getMqResource() == null) {
            mqContext.setMqResource(
                    getMqFactory().createMqResource(config.getUrl(), config.getReadTimeOut(), config.getReadTimeOut()));
        }
        mqContext.setConfig(config);
        if (msgsAsyn == null) {
            msgsAsyn = new ArrayBlockingQueue<>(config.getAsynCapacity());
        }
        //创建mqBrokerUrlRefresh服务,同时启动mqBrokerUrl刷新服务
        mqBrokerUrlRefreshService = mqFactory.createMqBrokerUrlRefreshService();
        mqBrokerUrlRefreshService.start();
    }

启动:

@Override
public void start() {
   if (startFlag.compareAndSet(false, true)) {          
      isStop = false;
      runStatus = false;
      //执行更新brokerUrls操作,使用单例线程池
      doUpdateBrokerUrls();
      executor = Executors.newScheduledThreadPool(1,
            SoaThreadFactory.create("mq-brokerFreshService-pool-%d", Thread.MAX_PRIORITY - 1, true));
      executor.scheduleAtFixedRate(new Runnable() {
         @Override
         public void run() {
            if (!isStop) {
               runStatus = true;
               doUpdateBrokerUrls();
               runStatus = false;
            }
         }
      }, 1, 20, TimeUnit.SECONDS);
   }
}

进行启动:

protected void doUpdateBrokerUrls() {
        try {
            GetMetaGroupResponse response = this.mqResource.getMetaGroup(request);
            if(response==null){
                return;
            }
            if (response != null && response.isSuc()) {
                mqContext.setBrokerMetaMode(response.getBrokerMetaMode());
                mqContext.setMetricUrl(response.getMetricUrl());
                if(Util.isEmpty(mqContext.getMetricUrl())){
                    //MqMeticReporterService.getInstance(mqClientBase).close();
                    MqClient.getMqFactory().createMqMeticReporterService().close();
                }else{
                    //创建mqMeticReportService,并启动
                    MqClient.getMqFactory().createMqMeticReporterService().start();
                }
            }           
            if (mqContext.getBrokerMetaMode() == 1 || (mqContext.getBrokerMetaMode() == 0 && mqContext.getConfig().isMetaMode())) {
                //List<String> brokerUrls = response.getBrokerIp();
                if (response.getBrokerIpG1()!=null) {                   
                    mqContext.setBrokerUrls(response.getBrokerIpG1(),response.getBrokerIpG2());
                }
            } else if (mqContext.getBrokerMetaMode() == -1 || !mqContext.getConfig().isMetaMode()) {
                mqContext.setBrokerUrls(new ArrayList<>(),new ArrayList<>());
            }
        } catch (Exception e) {
            log.error("updateBrokerError", e);
        }

    }

执行数据上报服务:

@Override
public void start() {
   if (startFlag.compareAndSet(false, true)) {
      // 每30s上报数据
      reporter.start(30, TimeUnit.SECONDS);
   }
}

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-12-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • ReentranLock源码学习

    首先回答一个问题?线程的三大特性?什么时候我们需要锁?java中已经提供了synchronized,为什么还要使用ReentrantLock?AQS原理。

    路行的亚洲
  • 对前端传入的json对象解析成多个对象

    multiRequestBodyDemo(@MultiRequestBody("dog")

    路行的亚洲
  • mybatis学习二

    前面我们通过看到mybatis中的session中的sqlSessionTest中,为我们提供了面向sqlSession的编程方式。但是对于面向对象的方式,这种...

    路行的亚洲
  • 日本研究人员设计“无声地震”探测系统,地震预警更精确

    2月3日,成都发生5.1级地震,震源深度21千米,好在无人员伤亡,民众情绪稳定。我们不得不感叹,在天灾面前人类总是如此渺小和脆弱。

    大数据文摘
  • 运营型数据库系列之性能概述

    这篇博客文章是CDP中Cloudera的运营数据库(OpDB)系列文章的一部分。每篇文章都会详细介绍新功能。从该系列的开头开始,请参阅《CDP中的运营数据库》,...

    大数据杂货铺
  • WordPress自定义url 中的“author” 别名

    默认的话,WordPress 链接到文章“作者”的别名(slug name)是如 devework.com/author/name 那样的,通过下面的代码,可以...

    Jeff
  • 中小企业本地链接策略有哪些呢?

    对于中心企业而言,我们最常听到的一件事情就是外链建设难,诚然这是目前SEO行业中,最让SEO人员头痛的一个问题,但有的时候我们面对问题,总是具有一定的思维局限性...

    蝙蝠侠IT
  • 每日算法系列【LeetCode 1363】形成三的最大倍数

    给你一个整数数组 digits,你可以通过按任意顺序连接其中某些数字来形成 3 的倍数,请你返回所能得到的最大的 3 的倍数。

    godweiyang
  • RecyclerView实现混合布局

    PS:好长时间不写博客了,起初是不知道写些什么,后来接触了到了很多东西,原本看似简单的东西,背后都隐藏着巨大的秘密,想handler的使用,一般情况下会引起内存...

    cMusketeer
  • 浅谈pycharm导入pandas包遇到的问题及解决

    最近受疫情影响,每天呆在家里上网课,三点一线地过着生活,不过在家跟在学校都是一样的,一样要上课听老师读PPT,一样要在电脑上敲自己也不是很懂的代码。这个学期我们...

    砸漏

扫码关注云+社区

领取腾讯云代金券