前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pmq学习七-重平衡

pmq学习七-重平衡

作者头像
路行的亚洲
发布2021-02-03 10:09:22
4480
发布2021-02-03 10:09:22
举报
文章被收录于专栏:后端技术学习

可以看到在pmq-ui中启动时会执行启动ComsumerGroupRb服务,同时还会启动MessageLagNotify服务以及Queue队列服务。消费组重平衡服务、db节点服务、消息清理服务、消息告警通知服务、清理无用服务、队列服务等

为了解决consumer和topic数量动态变化造成的问题,引入了重平衡(即consumer和queue的动态分配。

PMQ有一个重平衡器,它用来监控consumer的加入和退出、topic的扩容和缩容。

当某一个consumerGroup下的consumer数量发生变化,或者该consumerGroup订阅的topic的queue数量 发生了变化,就会触发重平衡器对该consumerGroup进行重平衡操作。

重平衡器对需要重平衡的consumerGroup,进行consumer和queue的重新分配。

代码语言:javascript
复制
//重平衡分配器 初始化
@PostConstruct
private void init() {
  //初始化
  super.init(Constants.RB, soaConfig.getRbCheckInterval(), soaConfig);

  soaConfig.registerChanged(new Runnable() {
     //通过soa配置获取重平衡检查interval
     private volatile int interval = soaConfig.getRbCheckInterval(); @Override
 public void run() {
    if (soaConfig.getRbCheckInterval() != interval) {
       interval = soaConfig.getRbCheckInterval();
       updateInterval(interval);
    }
 }

  });
}

进行初始化:

代码语言:javascript
复制
//注意interval为master判断间隔时间,当强行删除mqlock数据中的某条记录时,如果应用都启动了,必须等待一个interval周期才会开始新的master选举过程,如果有新的应用产生则进行新的选举,选择采取先到先得原则
public void init(String key, int interval, SoaConfig soaConfig) {
   this.key = key;
   mqLockService = new MqLockServiceImpl(key);
   executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),
         SoaThreadFactory.create(key, true), new ThreadPoolExecutor.DiscardOldestPolicy());
   this.interval = interval;
   this.soaConfig = soaConfig;
   this.traceMessage = TraceFactory.getInstance(key);
}

执行mq锁定master操作:

代码语言:javascript
复制
//mq锁定服务实现
public MqLockServiceImpl(String key) {
    this.key = key;
    this.emailUtil = SpringUtil.getBean(EmailUtil.class);
    this.traceMessage = TraceFactory.getInstance("lock-" + key);
    this.heartbeatProperty = new HeartbeatProperty() {
        @Override
        public int getValue() {
            //获取mq锁定心跳间隔
            return soaConfig.getMqLockHeartBeatTime();
        }
    };
}

执行心跳操作:

代码语言:javascript
复制
// mq锁心跳发送时间间隔
public int getMqLockHeartBeatTime() {
   try {
      if (!_getMqLockHeartBeatTime
            .equals(env.getProperty(env_getMqLockHeartBeatTime_key, env_getMqLockHeartBeatTime_defaultValue))) {
         _getMqLockHeartBeatTime = env.getProperty(env_getMqLockHeartBeatTime_key,
               env_getMqLockHeartBeatTime_defaultValue);
         getMqLockHeartBeatTime = Integer.parseInt(
               env.getProperty(env_getMqLockHeartBeatTime_key, env_getMqLockHeartBeatTime_defaultValue));
         if (getMqLockHeartBeatTime < 15) {
            getMqLockHeartBeatTime = 15;
         }
         onChange();
      }
   } catch (Exception e) {
      getMqLockHeartBeatTime = 15;
      onChange();
      log.error("getgetMqLockHeartBeatTime_SoaConfig_error", e);
   }
   return getMqLockHeartBeatTime;

}

执行onChange操作:而onChange就是启动服务操作,根据key来启动服务

代码语言:javascript
复制
//执行onChange方法
private void onChange() {
   executor.execute(() -> {
      for (Runnable runnable : changed.keySet()) {
         try {
            runnable.run();
         } catch (Exception e) {
            log.error("onchange-error", e);
         }
      }
   });
}

同时会将服务线程放入map中:

代码语言:javascript
复制
public void registerChanged(Runnable runnable) {
   changed.put(runnable, true);
}

启动重平衡操作:重平衡启动核心,里面涉及到重要操作:

代码语言:javascript
复制
initNotifyMessageStatId(); 初始化通知消息statId
initRbData(consumerGroupEntities, consumerGroupMap); 初始化重平衡数据
consumerGroupService.rb(t1.queueOffsets); 重平衡
addRbCompleteLog(t1); 添加重平衡日志
updateNotifyMessageId(currentMaxId); 更新通知消息id

执行启动:重平衡

代码语言:javascript
复制
//执行启动
public void doStart() {
    //如果是重平衡
    if (!soaConfig.isEnableRb()) {
        return;
    }
    //是否是master
    if (lastMaster != isMaster()) {
        //检查notifyMessageStatId,初始化notifyMessageStatId
        if (!checkNotifyMessageStatId()) {
            initNotifyMessageStatId();
        }
        //master
        lastMaster=isMaster();
    }
    //获取通知消息id
    long currentMaxId = getNotifyMessageId();
    if (currentMaxId == 0) {
        return;
    }
    //消费组实体列表
    List<ConsumerGroupEntity> consumerGroupEntities = consumerGroupService
        .getLastRbConsumerGroup(lastNotifyMessageId, currentMaxId);
    if (CollectionUtils.isEmpty(consumerGroupEntities)) {
        return;
    }
    Map<Long, ConsumerGroupQuqueVo> consumerGroupMap = new HashMap<>();
    //执行初始化重平衡数据 重要
    initRbData(consumerGroupEntities, consumerGroupMap);
    for (ConsumerGroupQuqueVo t1 : consumerGroupMap.values()) {
        rb(t1);
        for (int i = 0; i < 3; i++) {
            try {
                //如果是master,则执行重平衡操作,重要执行rb操作
                if (isMaster()) {
                    consumerGroupService.rb(t1.queueOffsets);
                }
                break;
            } catch (Exception e) {
                log.error("doCheckRebalance_error", e);
                Util.sleep(5000);
            }
        }
        //添加重平衡日志
        addRbCompleteLog(t1);
    }
    //更新通知消息id
    updateNotifyMessageId(currentMaxId);
    //减少不活跃的消费者 计数
    int count = consumerGroupConsumerService.deleteUnActiveConsumer();
    if (count > 0) {
        log.info("consumerGroupConsumer_empty,count is " + count);
    }
}

除了这个之外,具体的调用重平衡的操作,执行rb操作,类似于下面的代码带rb重平衡的代码:

代码语言:javascript
复制
//执行rb重平衡操作
@Override
// @Transactional(rollbackFor = Exception.class)
public void rb(List<QueueOffsetEntity> queueOffsetEntities) {
   Map<Long, String> idsMap = new HashMap<>(30);
   List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>(30);
   //将传入的查询偏移量的消费组id放入,并执行更新操作
   queueOffsetEntities.forEach(t1 -> {
      idsMap.put(t1.getConsumerGroupId(), "");
      NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
      notifyMessageEntity.setConsumerGroupId(t1.getConsumerGroupId());
      notifyMessageEntity.setMessageType(MessageType.Meta);
      notifyMessageEntities.add(notifyMessageEntity);
      // 更新consumerid 和consumername
      queueOffsetService.updateConsumerId(t1);
   });
   // 更新重平衡版本,注意这个代码非常的重要,这个可以保证客户端能够拿到最新的重平衡版本号
   updateRbVersion(new ArrayList<>(idsMap.keySet()));
   // 批量插入消息事件
   notifyMessageService.insertBatch(notifyMessageEntities);

}

或者:

代码语言:javascript
复制
// @Transactional(rollbackFor = Exception.class)
@Override
public void notifyRb(long id) {
   updateRbVersion(Arrays.asList(id));
   List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>();
   NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
   notifyMessageEntity.setConsumerGroupId(id);
   notifyMessageEntity.setMessageType(MessageType.Rb);
   notifyMessageEntities.add(notifyMessageEntity);

   notifyMessageEntity = new NotifyMessageEntity();
   notifyMessageEntity.setConsumerGroupId(id);
   notifyMessageEntity.setMessageType(MessageType.Meta);
   notifyMessageEntities.add(notifyMessageEntity);
   notifyMessageService.insertBatch(notifyMessageEntities);
}

执行重平衡操作的代码在ConsumerServiceImpl和ConsumerGroupServiceImpl中都有体现。这可以从消费者、消费组里面看到。

类似的,可以分析db节点服务、消息清理服务、消息告警通知服务、清理无用服务、队列服务等。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档