可以看到在pmq-ui中启动时会执行启动ComsumerGroupRb服务,同时还会启动MessageLagNotify服务以及Queue队列服务。消费组重平衡服务、db节点服务、消息清理服务、消息告警通知服务、清理无用服务、队列服务等
为了解决consumer和topic数量动态变化造成的问题,引入了重平衡(即consumer和queue的动态分配。
PMQ有一个重平衡器,它用来监控consumer的加入和退出、topic的扩容和缩容。
当某一个consumerGroup下的consumer数量发生变化,或者该consumerGroup订阅的topic的queue数量 发生了变化,就会触发重平衡器对该consumerGroup进行重平衡操作。
重平衡器对需要重平衡的consumerGroup,进行consumer和queue的重新分配。
//重平衡分配器 初始化
@PostConstruct
private void init() {
//初始化
super.init(Constants.RB, soaConfig.getRbCheckInterval(), soaConfig);
soaConfig.registerChanged(new Runnable() {
//通过soa配置获取重平衡检查interval
private volatile int interval = soaConfig.getRbCheckInterval(); @Override
public void run() {
if (soaConfig.getRbCheckInterval() != interval) {
interval = soaConfig.getRbCheckInterval();
updateInterval(interval);
}
}
});
}
进行初始化:
//注意interval为master判断间隔时间,当强行删除mqlock数据中的某条记录时,如果应用都启动了,必须等待一个interval周期才会开始新的master选举过程,如果有新的应用产生则进行新的选举,选择采取先到先得原则
public void init(String key, int interval, SoaConfig soaConfig) {
this.key = key;
mqLockService = new MqLockServiceImpl(key);
executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),
SoaThreadFactory.create(key, true), new ThreadPoolExecutor.DiscardOldestPolicy());
this.interval = interval;
this.soaConfig = soaConfig;
this.traceMessage = TraceFactory.getInstance(key);
}
执行mq锁定master操作:
//mq锁定服务实现
public MqLockServiceImpl(String key) {
this.key = key;
this.emailUtil = SpringUtil.getBean(EmailUtil.class);
this.traceMessage = TraceFactory.getInstance("lock-" + key);
this.heartbeatProperty = new HeartbeatProperty() {
@Override
public int getValue() {
//获取mq锁定心跳间隔
return soaConfig.getMqLockHeartBeatTime();
}
};
}
执行心跳操作:
// mq锁心跳发送时间间隔
public int getMqLockHeartBeatTime() {
try {
if (!_getMqLockHeartBeatTime
.equals(env.getProperty(env_getMqLockHeartBeatTime_key, env_getMqLockHeartBeatTime_defaultValue))) {
_getMqLockHeartBeatTime = env.getProperty(env_getMqLockHeartBeatTime_key,
env_getMqLockHeartBeatTime_defaultValue);
getMqLockHeartBeatTime = Integer.parseInt(
env.getProperty(env_getMqLockHeartBeatTime_key, env_getMqLockHeartBeatTime_defaultValue));
if (getMqLockHeartBeatTime < 15) {
getMqLockHeartBeatTime = 15;
}
onChange();
}
} catch (Exception e) {
getMqLockHeartBeatTime = 15;
onChange();
log.error("getgetMqLockHeartBeatTime_SoaConfig_error", e);
}
return getMqLockHeartBeatTime;
}
执行onChange操作:而onChange就是启动服务操作,根据key来启动服务
//执行onChange方法
private void onChange() {
executor.execute(() -> {
for (Runnable runnable : changed.keySet()) {
try {
runnable.run();
} catch (Exception e) {
log.error("onchange-error", e);
}
}
});
}
同时会将服务线程放入map中:
public void registerChanged(Runnable runnable) {
changed.put(runnable, true);
}
启动重平衡操作:重平衡启动核心,里面涉及到重要操作:
initNotifyMessageStatId(); 初始化通知消息statId
initRbData(consumerGroupEntities, consumerGroupMap); 初始化重平衡数据
consumerGroupService.rb(t1.queueOffsets); 重平衡
addRbCompleteLog(t1); 添加重平衡日志
updateNotifyMessageId(currentMaxId); 更新通知消息id
执行启动:重平衡
//执行启动
public void doStart() {
//如果是重平衡
if (!soaConfig.isEnableRb()) {
return;
}
//是否是master
if (lastMaster != isMaster()) {
//检查notifyMessageStatId,初始化notifyMessageStatId
if (!checkNotifyMessageStatId()) {
initNotifyMessageStatId();
}
//master
lastMaster=isMaster();
}
//获取通知消息id
long currentMaxId = getNotifyMessageId();
if (currentMaxId == 0) {
return;
}
//消费组实体列表
List<ConsumerGroupEntity> consumerGroupEntities = consumerGroupService
.getLastRbConsumerGroup(lastNotifyMessageId, currentMaxId);
if (CollectionUtils.isEmpty(consumerGroupEntities)) {
return;
}
Map<Long, ConsumerGroupQuqueVo> consumerGroupMap = new HashMap<>();
//执行初始化重平衡数据 重要
initRbData(consumerGroupEntities, consumerGroupMap);
for (ConsumerGroupQuqueVo t1 : consumerGroupMap.values()) {
rb(t1);
for (int i = 0; i < 3; i++) {
try {
//如果是master,则执行重平衡操作,重要执行rb操作
if (isMaster()) {
consumerGroupService.rb(t1.queueOffsets);
}
break;
} catch (Exception e) {
log.error("doCheckRebalance_error", e);
Util.sleep(5000);
}
}
//添加重平衡日志
addRbCompleteLog(t1);
}
//更新通知消息id
updateNotifyMessageId(currentMaxId);
//减少不活跃的消费者 计数
int count = consumerGroupConsumerService.deleteUnActiveConsumer();
if (count > 0) {
log.info("consumerGroupConsumer_empty,count is " + count);
}
}
除了这个之外,具体的调用重平衡的操作,执行rb操作,类似于下面的代码带rb重平衡的代码:
//执行rb重平衡操作
@Override
// @Transactional(rollbackFor = Exception.class)
public void rb(List<QueueOffsetEntity> queueOffsetEntities) {
Map<Long, String> idsMap = new HashMap<>(30);
List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>(30);
//将传入的查询偏移量的消费组id放入,并执行更新操作
queueOffsetEntities.forEach(t1 -> {
idsMap.put(t1.getConsumerGroupId(), "");
NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
notifyMessageEntity.setConsumerGroupId(t1.getConsumerGroupId());
notifyMessageEntity.setMessageType(MessageType.Meta);
notifyMessageEntities.add(notifyMessageEntity);
// 更新consumerid 和consumername
queueOffsetService.updateConsumerId(t1);
});
// 更新重平衡版本,注意这个代码非常的重要,这个可以保证客户端能够拿到最新的重平衡版本号
updateRbVersion(new ArrayList<>(idsMap.keySet()));
// 批量插入消息事件
notifyMessageService.insertBatch(notifyMessageEntities);
}
或者:
// @Transactional(rollbackFor = Exception.class)
@Override
public void notifyRb(long id) {
updateRbVersion(Arrays.asList(id));
List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>();
NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
notifyMessageEntity.setConsumerGroupId(id);
notifyMessageEntity.setMessageType(MessageType.Rb);
notifyMessageEntities.add(notifyMessageEntity);
notifyMessageEntity = new NotifyMessageEntity();
notifyMessageEntity.setConsumerGroupId(id);
notifyMessageEntity.setMessageType(MessageType.Meta);
notifyMessageEntities.add(notifyMessageEntity);
notifyMessageService.insertBatch(notifyMessageEntities);
}
执行重平衡操作的代码在ConsumerServiceImpl和ConsumerGroupServiceImpl中都有体现。这可以从消费者、消费组里面看到。
类似的,可以分析db节点服务、消息清理服务、消息告警通知服务、清理无用服务、队列服务等。