前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pmq学习六-broker启动

pmq学习六-broker启动

作者头像
路行的亚洲
发布2021-02-03 10:09:09
6260
发布2021-02-03 10:09:09
举报
文章被收录于专栏:后端技术学习

不管是生产消息还是消费消息,都会涉及到消息偏移量。在前面我们看到在启动broker的时候,会启动broker,而此时启动broker的同时,此时pmq里面会首先会去检查线程池的线程,如果与SoaConfig中的配置的线程不一样,则会进行调整,然后执行提交操作commit。根据启动提供的线索,我们可以找到broker。

下图来自pmq的官方文档

如果提交线程数大小不等于在soa配置中获取的提交线程大小,则修改成配置中的提交线程数大小,

同时设置线程池的核心线程池大小,执行提交偏移量操作(重点),提交偏移的过程也会涉及到元数据更新操作

提交的过程中,通过随机的方式来避免数据库的洪峰压力

提交提交分为单条和批量,而批量采用同步并发容器CountDownLatch提交

此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息

里面有两个概念:偏移量版本和偏移量的概念,偏移量我们好理解,而偏移量版本是针对每次修改版本都会+1

在偏移量或者版本号修改的过程中,此时的元数据必然需要进行改变,此时会调用到更新元数据的操作

此时元数据更新:

broker启动

代码语言:javascript
复制
/**
 * 如果提交线程数大小不等于在soa配置中获取的提交线程大小,则修改成配置中的提交线程数大小,
 * 同时设置线程池的核心线程池大小,执行提交偏移量操作(重点)
 */
@Override
public void startBroker() {
   commitThreadSize = soaConfig.getCommitThreadSize();
   executorRun = new ThreadPoolExecutor(commitThreadSize + 1, commitThreadSize + 1, 10L, TimeUnit.SECONDS,
         new ArrayBlockingQueue<>(50), SoaThreadFactory.create("commit-run", Thread.MAX_PRIORITY - 1, true),
         new ThreadPoolExecutor.CallerRunsPolicy());
   soaConfig.registerChanged(new Runnable() {
      @Override
      public void run() {
         if (commitThreadSize != soaConfig.getCommitThreadSize()) {
            commitThreadSize = soaConfig.getCommitThreadSize();
            executorRun.setCorePoolSize(commitThreadSize + 1);
            executorRun.setMaximumPoolSize(commitThreadSize + 1);
         }

      }
   });
   executorRun.execute(() -> {
      //提交消息偏移量
      commitOffset();
   });

}

为了缓解提交过快,在kafka中可以设置一扰动值,而为了避免压力会执行sleep操作

代码语言:javascript
复制
/**
 * 提交偏移量
 */
protected void commitOffset() {
   log.info("doSubmitOffset");
   while (isRunning) {
      //执行提交
      doCommit();
      // 通过随机的方式来避免数据库的洪峰压力
      Util.sleep(soaConfig.getCommitSleepTime());
   }
}

执行提交:提交消息分为单条和批量,而单条则直接执行,而批量则采用同步并发容器CountDownLatch提交。

提交偏移量

此时会涉及到单条和批量提交

代码语言:javascript
复制
//执行提交操作
protected void doCommit() {
   //获取偏移量版本
   Map<Long, OffsetVersionEntity> offsetVersionMap = queueOffsetService.getOffsetVersion();
   Map<Long, ConsumerQueueVersionDto> map = new HashMap<>(mapAppPolling.get());
   if (map.size() > 0) {
      final int size = map.size();
      Transaction transaction = Tracer.newTransaction("Timer-service", "commit");
      try {
         int countSize = map.size() < commitThreadSize ? map.size() : commitThreadSize;
         //单条消息
         if (countSize == 1) {
            for (Map.Entry<Long, ConsumerQueueVersionDto> entry : map.entrySet()) {
               doCommitOffset(entry.getValue(), 0, offsetVersionMap, size);
            }
         } else {
            //多条消息,同步并发容器CountDownLatch提交
            CountDownLatch countDownLatch = new CountDownLatch(countSize);
            for (Map.Entry<Long, ConsumerQueueVersionDto> entry : map.entrySet()) {
               executorRun.execute(new Runnable() {
                  @Override
                  public void run() {
                     //提交消息
                     doCommitOffset(entry.getValue(), 0, offsetVersionMap, size);
                     countDownLatch.countDown();
                  }
               });
            }
            countDownLatch.await();
         }
         transaction.setStatus(Transaction.SUCCESS);
      } catch (Exception e) {
         transaction.setStatus(e);
      } finally {
         transaction.complete();
      }
   }
}

同时:此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息,这里我觉得这里的bean操作可以使用bean拷贝,使用cglib的bean拷贝工具可以保证代码的简洁性。

代码语言:javascript
复制
//提交偏移量
protected boolean doCommitOffset(ConsumerQueueVersionDto request, int flag,
      Map<Long, OffsetVersionEntity> offsetVersionMap, int count) {
   Transaction catTransaction = null;
   OffsetVersionEntity offsetVersionEntity = offsetVersionMap.get(request.getQueueOffsetId());
   if (checkOffsetAndVersion(request, offsetVersionEntity)) {
      try {
         catTransaction = Tracer.newTransaction("Timer-service",
               MqConstanst.CONSUMERPRE + "/commitOffset-" + flag);
         QueueOffsetEntity queueOffsetEntity = new QueueOffsetEntity();
         queueOffsetEntity.setId(request.getQueueOffsetId());
         queueOffsetEntity.setOffsetVersion(request.getOffsetVersion());
         queueOffsetEntity.setOffset(request.getOffset());
         queueOffsetEntity.setConsumerGroupName(request.getConsumerGroupName());
         queueOffsetEntity.setTopicName(request.getTopicName());
         //此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息
         if (queueOffsetService.commitOffset(queueOffsetEntity) > 0 && offsetVersionEntity != null) {
            reentrantLock.lock();
            if (request.getOffsetVersion() == offsetVersionEntity.getOffsetVersion()
                  && request.getOffset() > offsetVersionEntity.getOffset()) {
               offsetVersionEntity.setOffset(request.getOffset());
            //这里使用if是不是更好一些呢    
            } else if (request.getOffsetVersion() > offsetVersionEntity.getOffsetVersion()) {
               offsetVersionEntity.setOffsetVersion(request.getOffsetVersion());
               offsetVersionEntity.setOffset(request.getOffset());
            }
            reentrantLock.unlock();
         }
         catTransaction.setStatus(Transaction.SUCCESS);

         return true;
      } catch (Exception e) {
         failMapAppPolling.put(request.getQueueOffsetId(), request);
         log.error("doSubmitOffset失败", e);
         catTransaction.setStatus(e);
         return false;
      } finally {
         catTransaction.complete();
      }
   }
   return true;
}

这就是关于偏移量的代码。

这里涉及到偏移量版本的概念:OffsetVersionEntity

代码语言:javascript
复制
/**
 * 偏移量版本实体类
 */
public class OffsetVersionEntity {
   //id
   private long id;
   //偏移量
   private long offset;
   //偏移量版本
   private long offsetVersion;
}

同时查询消息偏移量实体:

代码语言:javascript
复制
/**
 * @author dal-generator
 */
public class QueueOffsetEntity {

    /**
     * 
     */
     private long id;

    /**
     * 订阅者组id
     */
     private long consumerGroupId;

    /**
     * 订阅者组名称
     */
     private String consumerGroupName;

    /**
     * 客户端消费者name
     */
     private String consumerName;

    /**
     * 生产者id或者消费者id
     */
     private long consumerId;

    /**
     * 主题id
     */
     private long topicId;

    /**
     * 主题名称,如果
     */
     private String topicName;

    /**
     * 如果是失败队列此字段名称表示原始的topic名称,topic_name为consumer_group_name+原始的topic_name+"_fail",否则topic_name和origin_topic_name一致
     */
     private String originTopicName;

    /**
     * 1,表示正常队列,2,表示失败队列
     */
     private int topicType;

    /**
     * 分区id
     */
     private long queueId;

    /**
     * 消费者提交的偏移量
     */
     private long offset;

    /**
     * 订阅时的起始偏移量
     */
     private long startOffset;

    /**
     * 偏移版本号,当手动修改偏移时,会升级版本号,如果客户端提交更新偏移量的时候,只能按照版本号相同,偏移量大的值更新
     */
     private long offsetVersion;

    /**
     * 1,表示客户端此queue停止消费,0,表示正常消费
     */
     private int stopFlag;

    /**
     * ip+db_name +tb_name
     */
     private String dbInfo;

    /**
     * 操作人
     */
     private String insertBy;

    /**
     * 创建时间
     */
     private Date insertTime;

    /**
     * 操作人
     */
     private String updateBy;

    /**
     * 更新时间
     */
     private Date updateTime;

    /**
     * 逻辑删除
     */
     private int isActive;

    /**
     * 元数据更新时间
     */
     private Date metaUpdateTime;

    /**
     * 原始的消费者组名
     */
     private String originConsumerGroupName;

    /**
     * 1,为集群模式,2,为广播模式,3,为代理模式
     */
     private int consumerGroupMode;

    /**
     * 环境
     */
     private String subEnv;

上面是偏移量的基本代码。当然会涉及到元数据更新的问题。下面是元数据更新的操作。

队列元数据更新操作

考虑元数据更新的多种请求,同时对停止当前的偏移量进行判断,如果等于消费偏移量,则重新执行提交操作,否者执行删除偏移操作。

在偏移量或者版本号修改的过程中,此时的元数据必然需要进行改变,此时会调用到更新元数据的操作updateQueueMeta:

代码语言:javascript
复制
//更新元数据
@Override
public void updateQueueMeta(ConsumerQueueDto consumerQueue) {
   synchronized (lockMetaObj) {
      doUpdateQueueMeta(consumerQueue);
   }
}

执行更新元数据操作:

代码语言:javascript
复制
/**
 * 消费队列版本dto
 */
public class ConsumerQueueVersionDto {
   //队列偏移id
   private long queueOffsetId;
   //偏移量版本
   private volatile long offsetVersion;
   //偏移量
   private volatile long offset;
   //消费者组名称
   private String consumerGroupName;
   //主题名称
   private String topicName;
 }  

执行元数据更新操作思路:

代码语言:javascript
复制
1.查看消息队列中线程大小和当前的线程数是否相等,如果不等,则更新元数据线程信息
2.获取队列偏移量版本,比较版本,如果相同,则此时偏移量满了,需要执行扩容操作,然后获取,此时是元数据发生更新
3.发生队列重新消费,偏移量更新,为了防止拉取到数据后,会有阻塞,清除掉后,会消除阻塞
4.停止消费,检查偏移量版本和消费拿到的版本是否相同,如果相同,则执行再一次提交操作,否者则说明版本不同,此时需要执行删除操作
5.异常

代码:

代码语言:javascript
复制
//执行更新队列元数据操作
protected void doUpdateQueueMeta(ConsumerQueueDto consumerQueue) {
   Transaction transaction = Tracer.newTransaction("mq-group", "updateQueueMeta-" + consumerQueue.getTopicName());
   try {
      if (consumerQueue.getTimeout() == 0) {
         timeOutCount.set(0);
      }
      //获取当前的消费队列dto
      ConsumerQueueDto temp = consumerQueueRef.get();
      //获取flag,如果不同,则更新线程池线程数大小
      boolean flag = consumerQueue.getThreadSize() != temp.getThreadSize();
      if (flag) {
         log.info("update_thread_size,更新线程数" + consumerQueue.getTopicName());
         executor.setCorePoolSize(consumerQueue.getThreadSize() + 2);
         executor.setMaximumPoolSize(consumerQueue.getThreadSize() + 2);
      }
      // 此时是元数据发生更新,比较版本,版本相同,则说明需要扩容
      if (consumerQueue.getOffsetVersion() == temp.getOffsetVersion()) {
         log.info("update meta with topic:" + consumerQueue.getTopicName());
         // 更新元数据
         updateQueueMetaWithOutOffset(consumerQueue);

      } else {
         // 此时需要进行偏移更新
         log.info("queue offset changed,发生队列重新消费" + consumerQueue.getTopicName());
         consumerQueueRef.set(consumerQueue);
         // 此时是为了防止拉取到数据后,会有阻塞,清除掉后,会消除阻塞
         messages.clear();
         // 防止清除后,messages 里面有数据
         Util.sleep(100);
         // 再次清理,确保message 里面为空,同时使得拉取消息释放锁。
         messages.clear();
         // 确保更新拉取消息的起始值,为偏移重置的值,加锁是防止拉取与重置同时操作
         consumerQueue.setLastId(consumerQueue.getOffset());
         // 说明修改偏移了需要重新,拉取
         this.lastId = consumerQueue.getOffset();
      }
      if (isRunning && consumerQueue.getStopFlag() == 1) {
         log.info("stop deal,停止消费" + consumerQueue.getTopicName());
         TraceMessageItem item = new TraceMessageItem();
         doCommit(temp);
         item.status = "停止消费";
         item.msg = temp.getOffset() + "-" + temp.getOffsetVersion();
         traceMsgCommit.add(item);
      }
      isRunning = consumerQueue.getStopFlag() == 0;
      transaction.setStatus(Transaction.SUCCESS);
   } catch (Exception e) {
      transaction.setStatus(e);
   } finally {
      transaction.complete();
   }
}

执行doCommit操作:

代码语言:javascript
复制
/**
 * 执行提交消息
 * @param temp
 */
protected void doCommit(ConsumerQueueDto temp) {
   BatchRecorderItem item = batchRecorder.getLastestItem();
   if (item != null) {
      doCommit(temp, item);
   }
}

//执行提交操作
private void doCommit(ConsumerQueueDto temp, BatchRecorderItem batchRecorderItem) {
   if (batchRecorderItem == null)
      return;
   CommitOffsetRequest request = new CommitOffsetRequest();
    //检查偏移量版本和消费拿到的版本是否相同,如果相同,则执行再一次提交操作,否者则说明版本不同,此时需要执行删除操作
   if (checkOffsetVersion(temp)) {
      List<ConsumerQueueVersionDto> queueVersionDtos = new ArrayList<>();
      request.setQueueOffsets(queueVersionDtos);
      ConsumerQueueVersionDto consumerQueueVersionDto = new ConsumerQueueVersionDto();
      // consumerQueueVersionDto.setOffset(temp.getOffset());
      consumerQueueVersionDto.setOffset(batchRecorderItem.maxId);
      consumerQueueVersionDto.setOffsetVersion(temp.getOffsetVersion());
      consumerQueueVersionDto.setQueueOffsetId(temp.getQueueOffsetId());
      consumerQueueVersionDto.setConsumerGroupName(temp.getConsumerGroupName());
      consumerQueueVersionDto.setTopicName(temp.getTopicName());
      // request.setFailIds(preFailIds);
      queueVersionDtos.add(consumerQueueVersionDto);
      mqResource.commitOffset(request);
   }
   batchRecorder.delete(batchRecorderItem.batchReacorderId);
}

mqResource.commitOffset(request):

代码语言:javascript
复制
//提交偏移量
public void commitOffset(CommitOffsetRequest request) {
   if (request == null) {
      return;
   }
    //提交偏移量操作
   String url = MqConstanst.CONSUMERPRE + "/commitOffset";
   try {
      post(request, url, 10, CommitOffsetResponse.class, false);
   } catch (Exception e) {
      CatRequest request2 = new CatRequest();
      request2.setMethod("commitOffset");
      request2.setJson(JsonUtil.toJson(request));
      request2.setMsg(e.getMessage());
      addCat(request2);
   }
}

执行提交偏移量操作:

代码语言:javascript
复制
// 发送心跳,直接返回
@PostMapping("/commitOffset")
public CommitOffsetResponse commitOffset(@RequestBody CommitOffsetRequest request) {
   return consumerCommitService.commitOffset(request);
}

最终会调用doCommitOffset操作:

代码语言:javascript
复制
/**
 * 提交偏移量
 * @param request
 * @return
 */
@Override
public CommitOffsetResponse commitOffset(CommitOffsetRequest request) {
   // Transaction catTransaction = Tracer.newTransaction("Timer-service",
   // "commitOffset");
   CommitOffsetResponse response = new CommitOffsetResponse();
   response.setSuc(true);
   Map<Long, ConsumerQueueVersionDto> map = mapAppPolling.get();
   try {
      //如果请求不为空,同时请求的队列偏移量不为空,则执行提交操作
      if (request != null && !CollectionUtils.isEmpty(request.getQueueOffsets())) {
         request.getQueueOffsets().forEach(t1 -> {
            ConsumerQueueVersionDto temp = map.get(t1.getQueueOffsetId());
            boolean flag1 = true;
            if (temp == null) {
               synchronized (lockObj1) {
                  temp = map.get(t1.getQueueOffsetId());
                  if (temp == null) {
                     map.put(t1.getQueueOffsetId(), t1);
                     flag1 = false;
                  }
               }
            }
            if (flag1) {
               if (temp.getOffsetVersion() < t1.getOffsetVersion()) {
                  clearOldData();
                  map.put(t1.getQueueOffsetId(), t1);
               } else if (temp.getOffsetVersion() == t1.getOffsetVersion()
                     && temp.getOffset() < t1.getOffset()) {
                  clearOldData();
                  map.put(t1.getQueueOffsetId(), t1);
               }
            }
         });
         if (request.getFlag() == 1) {
            Map<Long, OffsetVersionEntity> offsetVersionMap = queueOffsetService.getOffsetVersion();
            request.getQueueOffsets().forEach(t1 -> {
               //执行提交偏移量
               doCommitOffset(t1, 1, offsetVersionMap, 0);
            });
         }
      }
   } catch (Exception e) {
   }
   // catTransaction.setStatus(Transaction.SUCCESS);
   // catTransaction.complete();
   return response;
}

执行提交偏移操作:

代码语言:javascript
复制
//提交偏移量
protected boolean doCommitOffset(ConsumerQueueVersionDto request, int flag,
      Map<Long, OffsetVersionEntity> offsetVersionMap, int count) {
   Transaction catTransaction = null;
   OffsetVersionEntity offsetVersionEntity = offsetVersionMap.get(request.getQueueOffsetId());
   if (checkOffsetAndVersion(request, offsetVersionEntity)) {
      try {
         //flag是标识
         catTransaction = Tracer.newTransaction("Timer-service",
               MqConstanst.CONSUMERPRE + "/commitOffset-" + flag);
         QueueOffsetEntity queueOffsetEntity = new QueueOffsetEntity();
         queueOffsetEntity.setId(request.getQueueOffsetId());
         queueOffsetEntity.setOffsetVersion(request.getOffsetVersion());
         queueOffsetEntity.setOffset(request.getOffset());
         queueOffsetEntity.setConsumerGroupName(request.getConsumerGroupName());
         queueOffsetEntity.setTopicName(request.getTopicName());
         //此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息
         if (queueOffsetService.commitOffset(queueOffsetEntity) > 0 && offsetVersionEntity != null) {
            reentrantLock.lock();
            if (request.getOffsetVersion() == offsetVersionEntity.getOffsetVersion()
                  && request.getOffset() > offsetVersionEntity.getOffset()) {
               offsetVersionEntity.setOffset(request.getOffset());
            } else if (request.getOffsetVersion() > offsetVersionEntity.getOffsetVersion()) {
               offsetVersionEntity.setOffsetVersion(request.getOffsetVersion());
               offsetVersionEntity.setOffset(request.getOffset());
            }
            reentrantLock.unlock();
         }
         catTransaction.setStatus(Transaction.SUCCESS);

         return true;
      } catch (Exception e) {
         failMapAppPolling.put(request.getQueueOffsetId(), request);
         log.error("doSubmitOffset失败", e);
         catTransaction.setStatus(e);
         return false;
      } finally {
         catTransaction.complete();
      }
   }
   return true;
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 不管是生产消息还是消费消息,都会涉及到消息偏移量。在前面我们看到在启动broker的时候,会启动broker,而此时启动broker的同时,此时pmq里面会首先会去检查线程池的线程,如果与SoaConfig中的配置的线程不一样,则会进行调整,然后执行提交操作commit。根据启动提供的线索,我们可以找到broker。
    • broker启动
      • 提交偏移量
        • 队列元数据更新操作
        相关产品与服务
        容器服务
        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档