专栏首页后端技术学习pmq学习四-生产消息到存储到消费的过程

pmq学习四-生产消息到存储到消费的过程

需要承认的是前面学习二中,pmq中的发送消息和消费消息是两个动作,同时操作的过程publish和pullData两个操作。认知有限,我误导大家了。接上上面的话题,发送消息publish的操作,前面已经说到基于HttpClient会执行post请求,里面有一个重要的url,这个是我们需要关注的。每一个url请求都是操作的开始。

过程:生产者发送消息publish->基于httpclient请求到mq-rest中的ConsumerController的publish,然后请求了publish,然后在消费者实现里面执行保存消息doSaveMsg,同时执行notifyClient操作,到notifyMsg,执行notify的过程中会执行doPollingData->pullData操作,请求到mq-rest,然后执行pullData操作。

其流程如下:

pmq流程图

pullData

public boolean publish(PublishMessageRequest request, int retryTimes) {
    //CONSUMERPRE="/api/client/consumer" url是/api/client/consumer/publish
    String url = MqConstanst.CONSUMERPRE + "/publish";
            long start = System.nanoTime();
            PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
}

找到这个url的请求,按照我们以前的习惯,一个请求过来,通常会请求到controller,因此我们可以找到:

mq-rest中的ConsumerController,它相当于一个中介,将数据存储和发送消息publish联系起来,在mq-biz中将消息放入分配好的写队列、主题队列,进行存储。

@RestController
@RequestMapping(MqConstanst.CONSUMERPRE)
public class ConsumerController {    
    //执行publish操作
    @PostMapping("/publish")
    public PublishMessageResponse publish(@RequestBody PublishMessageRequest request) {
       setSubEnv(request);
        //执行publish操作
       PublishMessageResponse response = consumerService.publish(request);
       return response;

}

此时可以在mq-biz中找到具体的实现:

此时思考一下,发送消息必须经过的流程,将消息进行存储,然后通知消费者,可以消费了。

//publish操作
@Override
public PublishMessageResponse publish(PublishMessageRequest request) {
    PublishMessageResponse response = new PublishMessageResponse();
    //校验
    checkVaild(request, response);
    if (!response.isSuc()) {
        return response;
    }
    try {
        if (!checkTopicRate(request, response)) {
            return response;
        }
        //获取所有分配好的topic写队列
        Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue();
        //获取所以分配好的主题队列
        Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue();
        if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) {
            List<QueueEntity> queueEntities = queueMap.get(request.getTopicName());
            if (queueEntities == null || queueEntities.size() == 0) {
                response.setSuc(false);
                response.setMsg("topic_" + request.getTopicName() + "_and_has_no_queue!");
                //如果主题队列map中包含请求中拿到主题名称,或者配置中拿到发布模式,则执行获取队列信息,同时更新队列缓存
                if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) {
                    queueEntities = topicQueueMap.get(request.getTopicName());
                    updateQueueCache(request.getTopicName());
                } else {
                    updateQueueCache(request.getTopicName());
                    return response;
                }
            }
            //如果队列实体列表>0,则执行保存消息操作
            if (queueEntities.size() > 0) {
                saveMsg(request, response, queueEntities);
            }
        } else {
            response.setSuc(false);
            response.setMsg("topic1_" + request.getTopicName() + "_and_has_no_queue!");
            return response;
        }
    } catch (Exception e) {
        log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e);
        response.setSuc(false);
        response.setMsg(e.getMessage());
    } finally {
        //最终,将计数进行自减,同时获取
        if (soaConfig.getEnableTopicRate() == 1) {
            totalMax.decrementAndGet();
            //获取主题
            topicPerMax.get(request.getTopicName()).decrementAndGet();
        }
    }
    return response;
}

saveMsg:保存消息,重点是doSaveMsg

//执行保存消息
    protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request,
                             PublishMessageResponse response, QueueEntity temp) {
        // Transaction transaction = Tracer.newTransaction("PubInner-" +
        // temp.getIp(), request.getTopicName());
        //消息服务设置数据库id
        message01Service.setDbId(temp.getDbNodeId());
        Transaction transaction = Tracer.newTransaction("Publish-Data", temp.getIp());
        try {
            transaction.addData("topic", request.getTopicName());
            //执行批量插入
            message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
            // 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知
            if (soaConfig.getMqPushFlag() == 1) {// apollo开关
                //通知客户端
                notifyClient(temp);
            }
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
            response.setSuc(true);
            transaction.setStatus(Transaction.SUCCESS);
            return;
        } catch (Exception e) {
            // sendPublishFailMail(request, e, 1);
            transaction.setStatus(e);
            if (e instanceof DataIntegrityViolationException
                    || e.getCause() instanceof DataIntegrityViolationException) {
                response.setSuc(false);
                response.setMsg(e.getMessage());
                return;
            }
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
            // transaction.setStatus(e);
            throw new RuntimeException(e);
        } finally {
            transaction.complete();
        }
    }

重点看到:

message01Service.setDbId(temp.getDbNodeId());
//执行批量插入
message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
// 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知
if (soaConfig.getMqPushFlag() == 1) {// apollo开关
    //通知客户端
    notifyClient(temp);
}

这个是核心,发送消息,执行保存,将消息保存到数据库,同时通知客户端。

//通知客户端
public void notifyClient(QueueEntity queueEntity) {
    try {
        //队列偏移量的服务
        Map<Long, List<QueueOffsetEntity>> queueIdQueueOffsetMap = queueOffsetService.getQueueIdQueueOffsetMap();
        //获取缓存,消费队列map
        Map<String, ConsumerGroupEntity> consumerGroupMap = consumerGroupService.getCache();
        List<QueueOffsetEntity> queueOffsetList = queueIdQueueOffsetMap.get(queueEntity.getId());
        if (queueOffsetList == null) {
            return;
        }
        Map<String, List<MsgNotifyDto>> notifyMap = new HashMap<>();
        for (QueueOffsetEntity queueOffset : queueOffsetList) {
            // 如果消费者组开启了实时消息,则给对应的客户端发送异步通知。
            if (consumerGroupMap.get(queueOffset.getConsumerGroupName()).getPushFlag() == 1
                    && speedLimit(queueEntity.getId())) {

                ConsumerUtil.ConsumerVo consumerVo = ConsumerUtil.parseConsumerId(queueOffset.getConsumerName());
                if (StringUtils.isEmpty(consumerVo.port)) {
                    continue;
                }
                //客户端url 消费者vo里面有ip
                String clienturl = "http://" + consumerVo.ip + ":" + consumerVo.port;

                if (!notifyMap.containsKey(clienturl)) {
                    notifyMap.put(clienturl, new ArrayList<>());
                }
                MsgNotifyDto msgNotifyDto = new MsgNotifyDto();
                msgNotifyDto.setConsumerGroupName(queueOffset.getConsumerGroupName());
                msgNotifyDto.setQueueId(queueEntity.getId());
                notifyMap.get(clienturl).add(msgNotifyDto);
            }
        }
        if (notifyMap.size() == 0) {
            return;
        }
        Transaction transaction = Tracer.newTransaction("mq-notify", "notifyClient");
        speedLimitMapRef.get().put(queueEntity.getId(), System.currentTimeMillis());
        for (String url : notifyMap.keySet()) {
            // 给对应的客户端发送拉取通知
            try {
                MsgNotifyRequest request = new MsgNotifyRequest();
                request.setMsgNotifyDtos(notifyMap.get(url));
                //执行发送异步,如果通知失败,每隔5秒释放一个线程请求,去探测。
                //重要
                if (notifyFailTentativeLimit(url)) {
                    httpClient.postAsyn(url + "/mq/client/notify", request, new NotifyCallBack(url));
                }

            } catch (Exception e) {
                log.error("给客户端发送拉取通知异常:", e);
            }
        }
        transaction.setStatus(Transaction.SUCCESS);
        transaction.complete();
    } catch (Exception e) {

    }
}

可以看到notify通知服务:

@RequestMapping("/mq/client/notify")
public void notify(@RequestBody MsgNotifyRequest request) {
   if (isOpenFlag()) {
      Transaction transaction = Tracer.newTransaction("mq-client", "/mq/client/notify");
      try {
         msgNotifyService.notify(request);
         transaction.setStatus(Transaction.SUCCESS);
      } catch (Exception e) {
         transaction.setStatus(e);
      }
      transaction.complete();
   }
}

找到notify:

@Override
public void notify(MsgNotifyRequest request) {
    //消费poll操作服务
   IConsumerPollingService consumerPollingService = MqClient.getMqFactory().createConsumerPollingService();
   Map<String, IMqGroupExcutorService> groups = consumerPollingService.getMqExcutors();
   if (groups != null && request != null && request.getMsgNotifyDtos() != null) {
      request.getMsgNotifyDtos().forEach(msgNotifyDto -> {
         if (groups.containsKey(msgNotifyDto.getConsumerGroupName())) {
            IMqGroupExcutorService iMqGroupExcutorService = groups.get(msgNotifyDto.getConsumerGroupName());
            Map<Long, IMqQueueExcutorService> queues = iMqGroupExcutorService.getQueueEx();
            if (queues.containsKey(msgNotifyDto.getQueueId())) {
               queues.get(msgNotifyDto.getQueueId()).notifyMsg();
            }
         }
      });
   }
}

找到notifyMsg:终于我们知道了我们想看到的方法doPullingData,是不是很高兴看到这个方法

@Override
public void notifyMsg() {
   createExecutorNotify();
   if (System.currentTimeMillis() > lastPullTime) {
      executorNotify.submit(new Runnable() {
         @Override
         public void run() {
            doPullingData();
         }
      });
   }
}

执行doPullingData:拉取消息,也即消费消息

protected boolean doPullingData() {
   if (pullFlag.compareAndSet(false, true)) {
      lastPullTime = System.currentTimeMillis();
      ConsumerQueueDto consumerQueueDto = consumerQueueRef.get();
      if (consumerQueueDto != null) {
         Transaction transaction = Tracer.newTransaction("mq-queue-pull",
               consumerQueueDto.getTopicName() + "-" + consumerQueueDto.getQueueId());
         TraceMessageItem traceMessageItem = new TraceMessageItem();
         try {
            request.setQueueId(consumerQueueDto.getQueueId());
            if (checkOffsetVersion(consumerQueueDto)) {
               consumerQueueDto.setLastId(lastId);
               request.setOffsetStart(lastId);
               request.setOffsetEnd(lastId + consumerQueueDto.getPullBatchSize());
               request.setConsumerGroupName(consumerQueueDto.getConsumerGroupName());
               request.setTopicName(consumerQueueDto.getTopicName());
               PullDataResponse response = null;
                //拉取数据pullData
               if (checkOffsetVersion(consumerQueueDto)) {
                  response = mqResource.pullData(request);
               }
               // PullDataResponse response = null;
               traceMessageItem.status = "拉取消息正常lastid-" + lastId;
               traceMessageItem.msg = "当前拉取lastid为:" + lastId + ",end:"
                     + (lastId + consumerQueueDto.getPullBatchSize()) + ",consumerGroupName:"
                     + consumerQueueDto.getConsumerGroupName() + ",topicName:"
                     + consumerQueueDto.getTopicName();
               if (response != null && response.getMsgs() != null && response.getMsgs().size() > 0) {
                  cacheData(response, consumerQueueDto);
                  transaction.setStatus(Transaction.SUCCESS);
                  return true;
               }
            }
            transaction.setStatus(Transaction.SUCCESS);
         } catch (Exception e) {
            traceMessageItem.status = "拉取消息失败";
            traceMessageItem.msg = e.getMessage();
            transaction.setStatus(e);
         } finally {
            traceMsgPull.add(traceMessageItem);
            transaction.complete();
             //设置成false,此时拉取完成
            pullFlag.set(false);
         }
      }
      return false;
   } else {
      return true;
   }

}

拉取消息:

public PullDataResponse pullData(PullDataRequest request) {
   if (request == null) {
      return null;
   }
   //CONSUMERPRE="/api/client/consumer"
    //url = /api/client/consumer/pullData
   String url = MqConstanst.CONSUMERPRE + "/pullData";
   try {
      return post(request, url, 2, PullDataResponse.class, true);
   } catch (Exception e) {
      CatRequest request2 = new CatRequest();
      request2.setMethod("pullData");
      request2.setJson(JsonUtil.toJson(request));
      request2.setMsg(e.getMessage());
      addCat(request2);
      throw e;
   }
}

找到这个方法,拉取消息:

//拉取数据操作
@PostMapping("/pullData")
public PullDataResponse pullData(@RequestBody PullDataRequest request) {
   PullDataResponse response = consumerService.pullData(request);
   return response;
}

拉取消息:

//拉取数据,消费消息
@Override
public PullDataResponse pullData(PullDataRequest request) {
    PullDataResponse response = new PullDataResponse();
    response.setSuc(true);
    Map<Long, QueueEntity> data = queueService.getAllQueueMap();
    checkVaild(request, response, data);
    if (!response.isSuc()) {
        return response;
    }
    //获取消息,通过队列id获取
    QueueEntity temp = data.get(request.getQueueId());
    Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache();
    List<Message01Entity> entities = new ArrayList<>();
    Transaction transaction = null;
    if (checkFailTime(request.getTopicName(), temp, null) && checkStatus(temp, dbNodeMap)) {
        //设置数据库id
        message01Service.setDbId(temp.getDbNodeId());
        transaction = Tracer.newTransaction("Pull-Data", temp.getIp());
        try {
            //获取消息,批量
            entities = message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(),
                    request.getOffsetEnd());
            transaction.setStatus(Transaction.SUCCESS);
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
        } catch (Exception e) {
            transaction.setStatus(e);
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
            // TODO: handle exception
        }

    } else {
        transaction = Tracer.newTransaction("PullData", "PullData-wait");
        transaction.setStatus(Transaction.SUCCESS);

    }
    transaction.complete();
    List<MessageDto> messageDtos = converMessageDto(entities);
    response.setMsgs(messageDtos);
    return response;
}

找到获取消息重要方法:

//设置数据库id
message01Service.setDbId(temp.getDbNodeId());
 message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(),
                    request.getOffsetEnd());
List<MessageDto> messageDtos = converMessageDto(entities);
response.setMsgs(messageDtos);        

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-01-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • pmq学习二-生产者和消费者流程

    学习一个框架,通常从example开始。同时一个消息中间件是从生产者开始,消费者消费消息。这里mq-client-test-001里面的两个类开始。

    路行的亚洲
  • pmq学习六-broker启动

    如果提交线程数大小不等于在soa配置中获取的提交线程大小,则修改成配置中的提交线程数大小,

    路行的亚洲
  • JVM学习一

    jvm的内存结构:可以看到我们的java文件会首先编译成class文件,经过类加载器进行加载,然后经过jvm的相关区域:f方法区、堆、虚拟机栈、程序计数器、本地...

    路行的亚洲
  • 【一起学源码-微服务】Feign 源码三:Feign结合Ribbon实现负载均衡的原理分析

    上一讲我们已经知道了Feign的工作原理其实是在项目启动的时候,通过JDK动态代理为每个FeignClinent生成一个动态代理。

    一枝花算不算浪漫
  • 聊聊springboot session timeout参数设置

    本文主要介绍下spring boot中对session timeout参数值的设置过程。

    codecraft
  • SpringMVC工作原理流程(二)

    当有请求过来,首先会先调用HttpServlet的service(ServletRequest req, ServletResponse res)方法,在ser...

    秋白
  • 深入源码分析SpringMVC执行过程

    首先,让我们从 Spring MVC 的四大组件:前端控制器(DispatcherServlet)、处理器映射器(HandlerMapping)、处理器适配器(...

    黄泽杰
  • Android Volley完全解析(四),带你从源码的角度理解Volley

    经过前三篇文章的学习,Volley的用法我们已经掌握的差不多了,但是对于Volley的工作原理,恐怕有很多朋友还不是很清楚。因此,本篇文章中我们就来一起阅读一下...

    用户1158055
  • 深入源码分析SpringMVC执行过程

    首先,让我们从 Spring MVC 的四大组件:前端控制器(DispatcherServlet)、处理器映射器(HandlerMapping)、处理器适配器(...

    武培轩
  • Django之XSS攻击

        xss跨站脚本攻击(Cross site script,简称xss)是一种“HTML注入”,由于攻击的脚本多数时候是跨域的,所以称之为“跨域脚本”。 ...

    超蛋lhy

扫码关注云+社区

领取腾讯云代金券