前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pmq学习四-生产消息到存储到消费的过程

pmq学习四-生产消息到存储到消费的过程

作者头像
路行的亚洲
发布2021-01-18 10:14:08
5300
发布2021-01-18 10:14:08
举报
文章被收录于专栏:后端技术学习后端技术学习

需要承认的是前面学习二中,pmq中的发送消息和消费消息是两个动作,同时操作的过程publish和pullData两个操作。认知有限,我误导大家了。接上上面的话题,发送消息publish的操作,前面已经说到基于HttpClient会执行post请求,里面有一个重要的url,这个是我们需要关注的。每一个url请求都是操作的开始。

过程:生产者发送消息publish->基于httpclient请求到mq-rest中的ConsumerController的publish,然后请求了publish,然后在消费者实现里面执行保存消息doSaveMsg,同时执行notifyClient操作,到notifyMsg,执行notify的过程中会执行doPollingData->pullData操作,请求到mq-rest,然后执行pullData操作。

其流程如下:

pmq流程图

pullData

代码语言:javascript
复制
public boolean publish(PublishMessageRequest request, int retryTimes) {
    //CONSUMERPRE="/api/client/consumer" url是/api/client/consumer/publish
    String url = MqConstanst.CONSUMERPRE + "/publish";
            long start = System.nanoTime();
            PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
}

找到这个url的请求,按照我们以前的习惯,一个请求过来,通常会请求到controller,因此我们可以找到:

mq-rest中的ConsumerController,它相当于一个中介,将数据存储和发送消息publish联系起来,在mq-biz中将消息放入分配好的写队列、主题队列,进行存储。

代码语言:javascript
复制
@RestController
@RequestMapping(MqConstanst.CONSUMERPRE)
public class ConsumerController {    
    //执行publish操作
    @PostMapping("/publish")
    public PublishMessageResponse publish(@RequestBody PublishMessageRequest request) {
       setSubEnv(request);
        //执行publish操作
       PublishMessageResponse response = consumerService.publish(request);
       return response;

}

此时可以在mq-biz中找到具体的实现:

此时思考一下,发送消息必须经过的流程,将消息进行存储,然后通知消费者,可以消费了。

代码语言:javascript
复制
//publish操作
@Override
public PublishMessageResponse publish(PublishMessageRequest request) {
    PublishMessageResponse response = new PublishMessageResponse();
    //校验
    checkVaild(request, response);
    if (!response.isSuc()) {
        return response;
    }
    try {
        if (!checkTopicRate(request, response)) {
            return response;
        }
        //获取所有分配好的topic写队列
        Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue();
        //获取所以分配好的主题队列
        Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue();
        if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) {
            List<QueueEntity> queueEntities = queueMap.get(request.getTopicName());
            if (queueEntities == null || queueEntities.size() == 0) {
                response.setSuc(false);
                response.setMsg("topic_" + request.getTopicName() + "_and_has_no_queue!");
                //如果主题队列map中包含请求中拿到主题名称,或者配置中拿到发布模式,则执行获取队列信息,同时更新队列缓存
                if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) {
                    queueEntities = topicQueueMap.get(request.getTopicName());
                    updateQueueCache(request.getTopicName());
                } else {
                    updateQueueCache(request.getTopicName());
                    return response;
                }
            }
            //如果队列实体列表>0,则执行保存消息操作
            if (queueEntities.size() > 0) {
                saveMsg(request, response, queueEntities);
            }
        } else {
            response.setSuc(false);
            response.setMsg("topic1_" + request.getTopicName() + "_and_has_no_queue!");
            return response;
        }
    } catch (Exception e) {
        log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e);
        response.setSuc(false);
        response.setMsg(e.getMessage());
    } finally {
        //最终,将计数进行自减,同时获取
        if (soaConfig.getEnableTopicRate() == 1) {
            totalMax.decrementAndGet();
            //获取主题
            topicPerMax.get(request.getTopicName()).decrementAndGet();
        }
    }
    return response;
}

saveMsg:保存消息,重点是doSaveMsg

代码语言:javascript
复制
//执行保存消息
    protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request,
                             PublishMessageResponse response, QueueEntity temp) {
        // Transaction transaction = Tracer.newTransaction("PubInner-" +
        // temp.getIp(), request.getTopicName());
        //消息服务设置数据库id
        message01Service.setDbId(temp.getDbNodeId());
        Transaction transaction = Tracer.newTransaction("Publish-Data", temp.getIp());
        try {
            transaction.addData("topic", request.getTopicName());
            //执行批量插入
            message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
            // 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知
            if (soaConfig.getMqPushFlag() == 1) {// apollo开关
                //通知客户端
                notifyClient(temp);
            }
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
            response.setSuc(true);
            transaction.setStatus(Transaction.SUCCESS);
            return;
        } catch (Exception e) {
            // sendPublishFailMail(request, e, 1);
            transaction.setStatus(e);
            if (e instanceof DataIntegrityViolationException
                    || e.getCause() instanceof DataIntegrityViolationException) {
                response.setSuc(false);
                response.setMsg(e.getMessage());
                return;
            }
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
            // transaction.setStatus(e);
            throw new RuntimeException(e);
        } finally {
            transaction.complete();
        }
    }

重点看到:

代码语言:javascript
复制
message01Service.setDbId(temp.getDbNodeId());
//执行批量插入
message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
// 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知
if (soaConfig.getMqPushFlag() == 1) {// apollo开关
    //通知客户端
    notifyClient(temp);
}

这个是核心,发送消息,执行保存,将消息保存到数据库,同时通知客户端。

代码语言:javascript
复制
//通知客户端
public void notifyClient(QueueEntity queueEntity) {
    try {
        //队列偏移量的服务
        Map<Long, List<QueueOffsetEntity>> queueIdQueueOffsetMap = queueOffsetService.getQueueIdQueueOffsetMap();
        //获取缓存,消费队列map
        Map<String, ConsumerGroupEntity> consumerGroupMap = consumerGroupService.getCache();
        List<QueueOffsetEntity> queueOffsetList = queueIdQueueOffsetMap.get(queueEntity.getId());
        if (queueOffsetList == null) {
            return;
        }
        Map<String, List<MsgNotifyDto>> notifyMap = new HashMap<>();
        for (QueueOffsetEntity queueOffset : queueOffsetList) {
            // 如果消费者组开启了实时消息,则给对应的客户端发送异步通知。
            if (consumerGroupMap.get(queueOffset.getConsumerGroupName()).getPushFlag() == 1
                    && speedLimit(queueEntity.getId())) {

                ConsumerUtil.ConsumerVo consumerVo = ConsumerUtil.parseConsumerId(queueOffset.getConsumerName());
                if (StringUtils.isEmpty(consumerVo.port)) {
                    continue;
                }
                //客户端url 消费者vo里面有ip
                String clienturl = "http://" + consumerVo.ip + ":" + consumerVo.port;

                if (!notifyMap.containsKey(clienturl)) {
                    notifyMap.put(clienturl, new ArrayList<>());
                }
                MsgNotifyDto msgNotifyDto = new MsgNotifyDto();
                msgNotifyDto.setConsumerGroupName(queueOffset.getConsumerGroupName());
                msgNotifyDto.setQueueId(queueEntity.getId());
                notifyMap.get(clienturl).add(msgNotifyDto);
            }
        }
        if (notifyMap.size() == 0) {
            return;
        }
        Transaction transaction = Tracer.newTransaction("mq-notify", "notifyClient");
        speedLimitMapRef.get().put(queueEntity.getId(), System.currentTimeMillis());
        for (String url : notifyMap.keySet()) {
            // 给对应的客户端发送拉取通知
            try {
                MsgNotifyRequest request = new MsgNotifyRequest();
                request.setMsgNotifyDtos(notifyMap.get(url));
                //执行发送异步,如果通知失败,每隔5秒释放一个线程请求,去探测。
                //重要
                if (notifyFailTentativeLimit(url)) {
                    httpClient.postAsyn(url + "/mq/client/notify", request, new NotifyCallBack(url));
                }

            } catch (Exception e) {
                log.error("给客户端发送拉取通知异常:", e);
            }
        }
        transaction.setStatus(Transaction.SUCCESS);
        transaction.complete();
    } catch (Exception e) {

    }
}

可以看到notify通知服务:

代码语言:javascript
复制
@RequestMapping("/mq/client/notify")
public void notify(@RequestBody MsgNotifyRequest request) {
   if (isOpenFlag()) {
      Transaction transaction = Tracer.newTransaction("mq-client", "/mq/client/notify");
      try {
         msgNotifyService.notify(request);
         transaction.setStatus(Transaction.SUCCESS);
      } catch (Exception e) {
         transaction.setStatus(e);
      }
      transaction.complete();
   }
}

找到notify:

代码语言:javascript
复制
@Override
public void notify(MsgNotifyRequest request) {
    //消费poll操作服务
   IConsumerPollingService consumerPollingService = MqClient.getMqFactory().createConsumerPollingService();
   Map<String, IMqGroupExcutorService> groups = consumerPollingService.getMqExcutors();
   if (groups != null && request != null && request.getMsgNotifyDtos() != null) {
      request.getMsgNotifyDtos().forEach(msgNotifyDto -> {
         if (groups.containsKey(msgNotifyDto.getConsumerGroupName())) {
            IMqGroupExcutorService iMqGroupExcutorService = groups.get(msgNotifyDto.getConsumerGroupName());
            Map<Long, IMqQueueExcutorService> queues = iMqGroupExcutorService.getQueueEx();
            if (queues.containsKey(msgNotifyDto.getQueueId())) {
               queues.get(msgNotifyDto.getQueueId()).notifyMsg();
            }
         }
      });
   }
}

找到notifyMsg:终于我们知道了我们想看到的方法doPullingData,是不是很高兴看到这个方法

代码语言:javascript
复制
@Override
public void notifyMsg() {
   createExecutorNotify();
   if (System.currentTimeMillis() > lastPullTime) {
      executorNotify.submit(new Runnable() {
         @Override
         public void run() {
            doPullingData();
         }
      });
   }
}

执行doPullingData:拉取消息,也即消费消息

代码语言:javascript
复制
protected boolean doPullingData() {
   if (pullFlag.compareAndSet(false, true)) {
      lastPullTime = System.currentTimeMillis();
      ConsumerQueueDto consumerQueueDto = consumerQueueRef.get();
      if (consumerQueueDto != null) {
         Transaction transaction = Tracer.newTransaction("mq-queue-pull",
               consumerQueueDto.getTopicName() + "-" + consumerQueueDto.getQueueId());
         TraceMessageItem traceMessageItem = new TraceMessageItem();
         try {
            request.setQueueId(consumerQueueDto.getQueueId());
            if (checkOffsetVersion(consumerQueueDto)) {
               consumerQueueDto.setLastId(lastId);
               request.setOffsetStart(lastId);
               request.setOffsetEnd(lastId + consumerQueueDto.getPullBatchSize());
               request.setConsumerGroupName(consumerQueueDto.getConsumerGroupName());
               request.setTopicName(consumerQueueDto.getTopicName());
               PullDataResponse response = null;
                //拉取数据pullData
               if (checkOffsetVersion(consumerQueueDto)) {
                  response = mqResource.pullData(request);
               }
               // PullDataResponse response = null;
               traceMessageItem.status = "拉取消息正常lastid-" + lastId;
               traceMessageItem.msg = "当前拉取lastid为:" + lastId + ",end:"
                     + (lastId + consumerQueueDto.getPullBatchSize()) + ",consumerGroupName:"
                     + consumerQueueDto.getConsumerGroupName() + ",topicName:"
                     + consumerQueueDto.getTopicName();
               if (response != null && response.getMsgs() != null && response.getMsgs().size() > 0) {
                  cacheData(response, consumerQueueDto);
                  transaction.setStatus(Transaction.SUCCESS);
                  return true;
               }
            }
            transaction.setStatus(Transaction.SUCCESS);
         } catch (Exception e) {
            traceMessageItem.status = "拉取消息失败";
            traceMessageItem.msg = e.getMessage();
            transaction.setStatus(e);
         } finally {
            traceMsgPull.add(traceMessageItem);
            transaction.complete();
             //设置成false,此时拉取完成
            pullFlag.set(false);
         }
      }
      return false;
   } else {
      return true;
   }

}

拉取消息:

代码语言:javascript
复制
public PullDataResponse pullData(PullDataRequest request) {
   if (request == null) {
      return null;
   }
   //CONSUMERPRE="/api/client/consumer"
    //url = /api/client/consumer/pullData
   String url = MqConstanst.CONSUMERPRE + "/pullData";
   try {
      return post(request, url, 2, PullDataResponse.class, true);
   } catch (Exception e) {
      CatRequest request2 = new CatRequest();
      request2.setMethod("pullData");
      request2.setJson(JsonUtil.toJson(request));
      request2.setMsg(e.getMessage());
      addCat(request2);
      throw e;
   }
}

找到这个方法,拉取消息:

代码语言:javascript
复制
//拉取数据操作
@PostMapping("/pullData")
public PullDataResponse pullData(@RequestBody PullDataRequest request) {
   PullDataResponse response = consumerService.pullData(request);
   return response;
}

拉取消息:

代码语言:javascript
复制
//拉取数据,消费消息
@Override
public PullDataResponse pullData(PullDataRequest request) {
    PullDataResponse response = new PullDataResponse();
    response.setSuc(true);
    Map<Long, QueueEntity> data = queueService.getAllQueueMap();
    checkVaild(request, response, data);
    if (!response.isSuc()) {
        return response;
    }
    //获取消息,通过队列id获取
    QueueEntity temp = data.get(request.getQueueId());
    Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache();
    List<Message01Entity> entities = new ArrayList<>();
    Transaction transaction = null;
    if (checkFailTime(request.getTopicName(), temp, null) && checkStatus(temp, dbNodeMap)) {
        //设置数据库id
        message01Service.setDbId(temp.getDbNodeId());
        transaction = Tracer.newTransaction("Pull-Data", temp.getIp());
        try {
            //获取消息,批量
            entities = message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(),
                    request.getOffsetEnd());
            transaction.setStatus(Transaction.SUCCESS);
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
        } catch (Exception e) {
            transaction.setStatus(e);
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
            // TODO: handle exception
        }

    } else {
        transaction = Tracer.newTransaction("PullData", "PullData-wait");
        transaction.setStatus(Transaction.SUCCESS);

    }
    transaction.complete();
    List<MessageDto> messageDtos = converMessageDto(entities);
    response.setMsgs(messageDtos);
    return response;
}

找到获取消息重要方法:

代码语言:javascript
复制
//设置数据库id
message01Service.setDbId(temp.getDbNodeId());
 message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(),
                    request.getOffsetEnd());
List<MessageDto> messageDtos = converMessageDto(entities);
response.setMsgs(messageDtos);        
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-01-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档