前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pmq学习三-mq客户端启动的流程

pmq学习三-mq客户端启动的流程

作者头像
路行的亚洲
发布2021-01-05 12:22:19
8440
发布2021-01-05 12:22:19
举报
文章被收录于专栏:后端技术学习后端技术学习

我们知道在RocketMQ中,服务端代表的是broker,而客户端才是我们的生产者和消费者。而pmq中,也是如此,服务端是broker,而客户端是生产者和消费者。客户端与spring集成,是从这里开始的,可以看到mq启动处理器实现了BeanFactoryPostProcessor,重写了postProcessBeanFactory后置处理器bean工厂。这里基本上涉及到IMqFactory上的接口。

客户端启动流程:

MqClient启动:

代码语言:javascript
复制
public class MqClientStartup {
   //spring初始化完成 ,重要
   public static void springInitComplete() {
      MqClient.start();
      monitorConfig();
   }

    //初始化
   public static void init(Environment env1) {
      if (initFlag.compareAndSet(false, true)) {
         env = env1;
         initConfig();
      }
   }

    //监控配置
      private static void monitorConfig() {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                while (isRunning) {
                    updateConfig();
                    Util.sleep(2000);
                }
            }

        });
    }
    //更新配置
    protected static void updateConfig() {
        if (properties == null) {
            properties = MqClient.getContext().getConfig().getProperties();
        }
        setRbTimes();
        setPbTimes();
        setAsynCapacity();
        setMetaMode();
        setPullDeltaTime();
        setPublishAsynTimeout();
    }

 }  

而这里可以看到启动MqClient.start():

代码语言:javascript
复制
public static boolean start() {
    //启动时,会注册消费者组
   if (startFlag.compareAndSet(false, true)) {
      registerConsumerGroup();
   }
   return false;
}

//可以看到只要启动时才会注册消费者组
public static boolean registerConsumerGroup(Map<String, ConsumerGroupVo> groups) {
        if (groups == null || groups.size() == 0) {
            return false;
        }
         //如果已经初始化,此时执行注册消费组
        if (hasInit()) {
            log.info("已经初始化完成!");
            return doRegisterConsumerGroup(groups);
        } else {
             //否者等待初始化完成,然后执行注册消费组 
            log.warn("系统为初始化,启动异步注册!");
            executor.execute(new Runnable() {
                public void run() {
                    while (!hasInit()) {
                        Util.sleep(2000);
                    }
                doRegisterConsumerGroup(groups);
            });
            return true;
        }
    }    

执行注册消费组

代码语言:javascript
复制
private static boolean doRegisterConsumerGroup(Map<String, ConsumerGroupVo> groups) {

    //变量消费者组
   for (ConsumerGroupVo consumerGroup : groups.values()) {
     //校验消费组  
      if (!checkVaild(consumerGroup)) {
         return false;
      }
       //如果拿到的消费者组版本包含消费者组拿到的元数据中的名称,则设置为false,同时提示已订阅
      if (mqContext.getConsumerGroupVersion().containsKey(consumerGroup.getMeta().getName())) {
         log.info("ConsumerGroup:" + consumerGroup.getMeta().getName() + " has  subscribed,已订阅!");
         return false;
      }
       //消费者组通过元数据拿到originName如果为空,则设置originName
      if (Util.isEmpty(consumerGroup.getMeta().getOriginName())) {
         consumerGroup.getMeta().setOriginName(consumerGroup.getMeta().getName());
      }
       //消费组拿到topic如果不为空,则消费组名称放入信息
      if (consumerGroup.getTopics() != null) {
         consumerGroupNames.put(consumerGroup.getMeta().getOriginName(),
               new ArrayList<>(consumerGroup.getTopics().keySet()));
      } else {
         consumerGroupNames.put(consumerGroup.getMeta().getOriginName(), new ArrayList<>());
      }
      groupNames += consumerGroup.getMeta().getName() + ",";
   }
   //事件触发,将注册设置为true
   register();
    //消费组注册请求,request设置信息包括:消费者名称、消费者id、客户端id、消费者名称、设置subEnv
   ConsumerGroupRegisterRequest request = new ConsumerGroupRegisterRequest();
   request.setConsumerGroupNames(consumerGroupNames);
   request.setConsumerId(mqContext.getConsumerId());
   request.setClientIp(mqContext.getConfig().getIp());
   request.setConsumerName(mqContext.getConsumerName());
   if (MqClient.getMqEnvironment() != null) {
      if (MqEnv.FAT == MqClient.getMqEnvironment().getEnv()) {
         request.setSubEnv(MqClient.getMqEnvironment().getSubEnv().toLowerCase());
      }
   }
   try {
      //消费者组注册响应,注册消费者,放入请求
      ConsumerGroupRegisterResponse consumerGroupRegisterResponse = mqContext.getMqResource()
            .registerConsumerGroup(request);
       //如果消费者组注册响应为success,则填充信息
      if (consumerGroupRegisterResponse.isSuc()) {
         Map<String, String> broadcastConsumerGroupNames = consumerGroupRegisterResponse
               .getConsumerGroupNameNew();
         for (ConsumerGroupVo consumerGroup : groups.values()) {
            if (broadcastConsumerGroupNames != null
                  && broadcastConsumerGroupNames.containsKey(consumerGroup.getMeta().getOriginName())) {
               consumerGroup.getMeta()
                     .setName(broadcastConsumerGroupNames.get(consumerGroup.getMeta().getOriginName()));
            }
            mqContext.getConfigConsumerGroup().put(consumerGroup.getMeta().getName(), consumerGroup);
            mqContext.getConsumerGroupVersion().put(consumerGroup.getMeta().getName(), 0L);
            //fire消费者组注册事件为true getRegisterConsumerGroupListeners()线程启动
             fireConsumerGroupRegisterEvent(consumerGroup);
         }
          //创建消费者polling服务,并启动consumerPollingService服务,mqCheckService创建,同时启动
         consumerPollingService = mqFactory.createConsumerPollingService();
         consumerPollingService.start();
         mqCheckService = mqFactory.createMqCheckService();
         mqCheckService.start();
         // MqCheckService.getInstance().start(mqContext);
         log.info(groupNames + "  subscribe_suc,订阅成功!and json is " + JsonUtil.toJson(request));
      } else {
         throw new RuntimeException("registerConsumerGroup_error, the req is" + JsonUtil.toJsonNull(request)
               + ",and resp is " + JsonUtil.toJson(consumerGroupRegisterResponse));
      }
   } catch (Exception e) {
      log.error("consumer_group_register_error", e);
      throw new RuntimeException(e);
   }
   return true;
}

执行consumerPollingService、mqCheckService启动

代码语言:javascript
复制
@Override
public void start() {
   if (startFlag.compareAndSet(false, true)) {
      isStop = false;
      runStatus = false;
      executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100),
            SoaThreadFactory.create("ConsumerPollingService", true),
            new ThreadPoolExecutor.DiscardOldestPolicy());
      executor.execute(new Runnable() {
         @Override
         public void run() {
            while (!isStop) {
                //将允许状态设置为true,同时将traceMessaageItem设置为true,执行longPolling操作
               TraceMessageItem traceMessageItem = new TraceMessageItem();
               runStatus = true;
               try {
                  traceMessageItem.status = "suc";
                  longPolling();
               } catch (Exception e) {
                  // e.printStackTrace();
                  traceMessageItem.status = "fail";
                  Util.sleep(1000);
               }
                //链路追踪添加链路信息,同时设置为false
               traceMsg.add(traceMessageItem);
               runStatus = false;
            }
         }
      });
   }
}

设置consumerPollingService状态为true,同时启动longPolling方法,也即此时会启动链路追踪

代码语言:javascript
复制
protected void longPolling() {
   if (mqContext.getConsumerId() > 0 && mqContext.getConsumerGroupVersion() != null
         && mqContext.getConsumerGroupVersion().size() > 0) {
       //设置事务
      Transaction transaction = Tracer.newTransaction("mq-group", "longPolling");
      try {
         //创建获取消费组请求对象,放入消费者id、消费者组版本 
         GetConsumerGroupRequest request = new GetConsumerGroupRequest();
         request.setConsumerId(mqContext.getConsumerId());
         request.setConsumerGroupVersion(mqContext.getConsumerGroupVersion());
          //获取消费者组响应
         GetConsumerGroupResponse response = mqResource.getConsumerGroup(request);
         if (response != null && response.getConsumerDeleted() == 1) {
            log.info("consumerid为" + request.getConsumerId());
         }
          //执行处理group
         handleGroup(response);
         transaction.setStatus(Transaction.SUCCESS);
      } catch (Exception e) {
         transaction.setStatus(e);
      } finally {
         transaction.complete();
      }
   } else {
      Util.sleep(1000);
   }
}

执行handlerGroup:

代码语言:javascript
复制
protected void handleGroup(GetConsumerGroupResponse response) {
   if (isStop) {
      return;
   }
    //响应如果不为空,则设置broker元数据模式
   if (response != null) {
      mqContext.setBrokerMetaMode(response.getBrokerMetaMode());
      if (MqClient.getMqEnvironment() != null && MqClient.getMqEnvironment().getEnv() == MqEnv.FAT) {
         MqClient.getContext().setAppSubEnvMap(response.getConsumerGroupSubEnvMap());
      }
   }
    //mq客户端重新启动
   if (response != null && response.getConsumerDeleted() == 1) {
      MqClient.reStart();
      Util.sleep(5000);
      return;
   } else if (response != null && response.getConsumerGroups() != null
         && response.getConsumerGroups().size() > 0) {
      log.info("get_consumer_group_data,获取到的最新消费者组数据为:" + JsonUtil.toJson(response));
      TraceMessageItem item = new TraceMessageItem();
      item.status = "changed";
      item.msg = JsonUtil.toJson(response);
       //响应拿到消费者组,进行遍历,如果没有stop,同时请求为新请求,请求放入,同时创建mq组线程服务
      response.getConsumerGroups().entrySet().forEach(t1 -> {
         if (!isStop) {
            if (!mqExcutors.containsKey(t1.getKey())) {
               mqExcutors.put(t1.getKey(), mqFactory.createMqGroupExcutorService());
            }
            log.info("consumer_group_data_change,消费者组" + t1.getKey() + "发生重平衡或者meta更新");
            // 进行重平衡操作或者更新元数据信息
            mqExcutors.get(t1.getKey()).rbOrUpdate(t1.getValue(), response.getServerIp());  
            mqContext.getConsumerGroupVersion().put(t1.getKey(), t1.getValue().getMeta().getVersion());
         }
      });
      traceMsg.add(item);
   }
   // 然后启动
   mqExcutors.values().forEach(t1 -> {
      t1.start();
   });
}

可以看到MqGroupExcutorService信息:

代码语言:javascript
复制
public MqGroupExcutorService(IMqResource mqResource) {
   //mq上下文、mqResource、mq工厂 
   this.mqContext = MqClient.getContext();
   this.mqResource = mqResource;
   this.mqFactory = MqClient.getMqFactory();
}

同时可以看到MqCheckService信息:

代码语言:javascript
复制
public MqCheckService(IMqResource mqResource) {
   this.mqContext = MqClient.getContext();
   this.mqResource = mqResource;
}

Mq启动后置处理器:这里是和Spring集成的方式:

代码语言:javascript
复制
@Component
public class MqStartProcessor implements BeanFactoryPostProcessor, PriorityOrdered, EnvironmentAware {
   @Override
   public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
      if (environment != null) { 
         if (initFlag.compareAndSet(false, true)) { 
            logger.info("消息客户端开始初始化!");
            MqClient.setSubscriberResolver(new SubscriberResolver());
             //初始化
            MqClientStartup.init(environment);
            //MqClientStartup.start();
            // statService.start();
            logger.info("消息客户端初始化完成!");
         }
      }
   }

}

mq客户端启动初始化:

代码语言:javascript
复制
public static void init(Environment env1) {
   if (initFlag.compareAndSet(false, true)) {
      env = env1;
      initConfig();
   }
}

//初始化配置
private static void initConfig() {
        MqConfig config = new MqConfig();
        String netCard = System.getProperty("mq.network.netCard", env.getProperty("mq.network.netCard", ""));       
        String url =System.getProperty("mq.broker.url", env.getProperty("mq.broker.url", ""));      
        String host = System.getProperty("mq.client.host", env.getProperty("mq.client.host", ""));
        String serverPort = System.getProperty("server.port", env.getProperty("server.port", "8080"));
        String asynCapacity = System.getProperty("mq.asyn.capacity", env.getProperty("mq.asyn.capacity", "2000"));
        String rbTimes = System.getProperty("mq.rb.times", env.getProperty("mq.rb.times", "4"));
        String pbRetryTimes = System.getProperty("mq.pb.retry.times", env.getProperty("mq.pb.retry.times", "10"));
        String readTimeOut = System.getProperty("mq.http.timeout", env.getProperty("mq.http.timeout", "10000"));
        String pullDeltaTime = System.getProperty("mq.pull.time.delta", env.getProperty("mq.pull.time.delta", "150"));
        boolean metaMode = "true"
                .equals(System.getProperty("mq.broker.metaMode", env.getProperty("mq.broker.metaMode", "true")));

         //mq客户端初始化,然后更新配置 
        MqClient.init(config);
        updateConfig();
    }

进行初始化:

代码语言:javascript
复制
public static void init(MqConfig config) {
   if (initFlag.compareAndSet(false, true)) {
      //执行初始化 
      doInit(config);
      //激活初始化事件  getInitCompleted 线程启动
      fireInitEvent();
      log.info("mq_client has  inited,初始化完成");
   }
}

//执行初始化
private static void doInit(MqConfig config) {
        //设置消费者名称
        mqContext.setConsumerName(
                ConsumerUtil.getConsumerId(config.getIp(), PropUtil.getProcessId() + "", config.getServerPort()));
         //如果mq上下文里面的mq信息为null,mqContext填充配置信息,同时异步消息为空,则创建msgAsyn队列。
        if (mqContext.getMqResource() == null) {
            mqContext.setMqResource(
                    getMqFactory().createMqResource(config.getUrl(), config.getReadTimeOut(), config.getReadTimeOut()));
        }
        mqContext.setConfig(config);
        if (msgsAsyn == null) {
            msgsAsyn = new ArrayBlockingQueue<>(config.getAsynCapacity());
        }
        //创建mqBrokerUrlRefresh服务,同时启动mqBrokerUrl刷新服务
        mqBrokerUrlRefreshService = mqFactory.createMqBrokerUrlRefreshService();
        mqBrokerUrlRefreshService.start();
    }

启动:

代码语言:javascript
复制
@Override
public void start() {
   if (startFlag.compareAndSet(false, true)) {          
      isStop = false;
      runStatus = false;
      //执行更新brokerUrls操作,使用单例线程池
      doUpdateBrokerUrls();
      executor = Executors.newScheduledThreadPool(1,
            SoaThreadFactory.create("mq-brokerFreshService-pool-%d", Thread.MAX_PRIORITY - 1, true));
      executor.scheduleAtFixedRate(new Runnable() {
         @Override
         public void run() {
            if (!isStop) {
               runStatus = true;
               doUpdateBrokerUrls();
               runStatus = false;
            }
         }
      }, 1, 20, TimeUnit.SECONDS);
   }
}

进行启动:

代码语言:javascript
复制
protected void doUpdateBrokerUrls() {
        try {
            GetMetaGroupResponse response = this.mqResource.getMetaGroup(request);
            if(response==null){
                return;
            }
            if (response != null && response.isSuc()) {
                mqContext.setBrokerMetaMode(response.getBrokerMetaMode());
                mqContext.setMetricUrl(response.getMetricUrl());
                if(Util.isEmpty(mqContext.getMetricUrl())){
                    //MqMeticReporterService.getInstance(mqClientBase).close();
                    MqClient.getMqFactory().createMqMeticReporterService().close();
                }else{
                    //创建mqMeticReportService,并启动
                    MqClient.getMqFactory().createMqMeticReporterService().start();
                }
            }           
            if (mqContext.getBrokerMetaMode() == 1 || (mqContext.getBrokerMetaMode() == 0 && mqContext.getConfig().isMetaMode())) {
                //List<String> brokerUrls = response.getBrokerIp();
                if (response.getBrokerIpG1()!=null) {                   
                    mqContext.setBrokerUrls(response.getBrokerIpG1(),response.getBrokerIpG2());
                }
            } else if (mqContext.getBrokerMetaMode() == -1 || !mqContext.getConfig().isMetaMode()) {
                mqContext.setBrokerUrls(new ArrayList<>(),new ArrayList<>());
            }
        } catch (Exception e) {
            log.error("updateBrokerError", e);
        }

    }

执行数据上报服务:

代码语言:javascript
复制
@Override
public void start() {
   if (startFlag.compareAndSet(false, true)) {
      // 每30s上报数据
      reporter.start(30, TimeUnit.SECONDS);
   }
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档