需要承认的是前面学习二中,pmq中的发送消息和消费消息是两个动作,同时操作的过程publish和pullData两个操作。认知有限,我误导大家了。接上上面的话题,发送消息publish的操作,前面已经说到基于HttpClient会执行post请求,里面有一个重要的url,这个是我们需要关注的。每一个url请求都是操作的开始。
过程:生产者发送消息publish->基于httpclient请求到mq-rest中的ConsumerController的publish,然后请求了publish,然后在消费者实现里面执行保存消息doSaveMsg,同时执行notifyClient操作,到notifyMsg,执行notify的过程中会执行doPollingData->pullData操作,请求到mq-rest,然后执行pullData操作。
其流程如下:
pmq流程图
pullData
public boolean publish(PublishMessageRequest request, int retryTimes) { //CONSUMERPRE="/api/client/consumer" url是/api/client/consumer/publish String url = MqConstanst.CONSUMERPRE + "/publish"; long start = System.nanoTime(); PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true); }
找到这个url的请求,按照我们以前的习惯,一个请求过来,通常会请求到controller,因此我们可以找到:
mq-rest中的ConsumerController,它相当于一个中介,将数据存储和发送消息publish联系起来,在mq-biz中将消息放入分配好的写队列、主题队列,进行存储。
@RestController @RequestMapping(MqConstanst.CONSUMERPRE) public class ConsumerController { //执行publish操作 @PostMapping("/publish") public PublishMessageResponse publish(@RequestBody PublishMessageRequest request) { setSubEnv(request); //执行publish操作 PublishMessageResponse response = consumerService.publish(request); return response; }
此时可以在mq-biz中找到具体的实现:
此时思考一下,发送消息必须经过的流程,将消息进行存储,然后通知消费者,可以消费了。
//publish操作 @Override public PublishMessageResponse publish(PublishMessageRequest request) { PublishMessageResponse response = new PublishMessageResponse(); //校验 checkVaild(request, response); if (!response.isSuc()) { return response; } try { if (!checkTopicRate(request, response)) { return response; } //获取所有分配好的topic写队列 Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue(); //获取所以分配好的主题队列 Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue(); if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) { List<QueueEntity> queueEntities = queueMap.get(request.getTopicName()); if (queueEntities == null || queueEntities.size() == 0) { response.setSuc(false); response.setMsg("topic_" + request.getTopicName() + "_and_has_no_queue!"); //如果主题队列map中包含请求中拿到主题名称,或者配置中拿到发布模式,则执行获取队列信息,同时更新队列缓存 if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) { queueEntities = topicQueueMap.get(request.getTopicName()); updateQueueCache(request.getTopicName()); } else { updateQueueCache(request.getTopicName()); return response; } } //如果队列实体列表>0,则执行保存消息操作 if (queueEntities.size() > 0) { saveMsg(request, response, queueEntities); } } else { response.setSuc(false); response.setMsg("topic1_" + request.getTopicName() + "_and_has_no_queue!"); return response; } } catch (Exception e) { log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e); response.setSuc(false); response.setMsg(e.getMessage()); } finally { //最终,将计数进行自减,同时获取 if (soaConfig.getEnableTopicRate() == 1) { totalMax.decrementAndGet(); //获取主题 topicPerMax.get(request.getTopicName()).decrementAndGet(); } } return response; }
saveMsg:保存消息,重点是doSaveMsg
//执行保存消息 protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request, PublishMessageResponse response, QueueEntity temp) { // Transaction transaction = Tracer.newTransaction("PubInner-" + // temp.getIp(), request.getTopicName()); //消息服务设置数据库id message01Service.setDbId(temp.getDbNodeId()); Transaction transaction = Tracer.newTransaction("Publish-Data", temp.getIp()); try { transaction.addData("topic", request.getTopicName()); //执行批量插入 message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities); // 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知 if (soaConfig.getMqPushFlag() == 1) {// apollo开关 //通知客户端 notifyClient(temp); } dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L); response.setSuc(true); transaction.setStatus(Transaction.SUCCESS); return; } catch (Exception e) { // sendPublishFailMail(request, e, 1); transaction.setStatus(e); if (e instanceof DataIntegrityViolationException || e.getCause() instanceof DataIntegrityViolationException) { response.setSuc(false); response.setMsg(e.getMessage()); return; } dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis()); // transaction.setStatus(e); throw new RuntimeException(e); } finally { transaction.complete(); } }
重点看到:
message01Service.setDbId(temp.getDbNodeId()); //执行批量插入 message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities); // 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知 if (soaConfig.getMqPushFlag() == 1) {// apollo开关 //通知客户端 notifyClient(temp); }
这个是核心,发送消息,执行保存,将消息保存到数据库,同时通知客户端。
//通知客户端 public void notifyClient(QueueEntity queueEntity) { try { //队列偏移量的服务 Map<Long, List<QueueOffsetEntity>> queueIdQueueOffsetMap = queueOffsetService.getQueueIdQueueOffsetMap(); //获取缓存,消费队列map Map<String, ConsumerGroupEntity> consumerGroupMap = consumerGroupService.getCache(); List<QueueOffsetEntity> queueOffsetList = queueIdQueueOffsetMap.get(queueEntity.getId()); if (queueOffsetList == null) { return; } Map<String, List<MsgNotifyDto>> notifyMap = new HashMap<>(); for (QueueOffsetEntity queueOffset : queueOffsetList) { // 如果消费者组开启了实时消息,则给对应的客户端发送异步通知。 if (consumerGroupMap.get(queueOffset.getConsumerGroupName()).getPushFlag() == 1 && speedLimit(queueEntity.getId())) { ConsumerUtil.ConsumerVo consumerVo = ConsumerUtil.parseConsumerId(queueOffset.getConsumerName()); if (StringUtils.isEmpty(consumerVo.port)) { continue; } //客户端url 消费者vo里面有ip String clienturl = "http://" + consumerVo.ip + ":" + consumerVo.port; if (!notifyMap.containsKey(clienturl)) { notifyMap.put(clienturl, new ArrayList<>()); } MsgNotifyDto msgNotifyDto = new MsgNotifyDto(); msgNotifyDto.setConsumerGroupName(queueOffset.getConsumerGroupName()); msgNotifyDto.setQueueId(queueEntity.getId()); notifyMap.get(clienturl).add(msgNotifyDto); } } if (notifyMap.size() == 0) { return; } Transaction transaction = Tracer.newTransaction("mq-notify", "notifyClient"); speedLimitMapRef.get().put(queueEntity.getId(), System.currentTimeMillis()); for (String url : notifyMap.keySet()) { // 给对应的客户端发送拉取通知 try { MsgNotifyRequest request = new MsgNotifyRequest(); request.setMsgNotifyDtos(notifyMap.get(url)); //执行发送异步,如果通知失败,每隔5秒释放一个线程请求,去探测。 //重要 if (notifyFailTentativeLimit(url)) { httpClient.postAsyn(url + "/mq/client/notify", request, new NotifyCallBack(url)); } } catch (Exception e) { log.error("给客户端发送拉取通知异常:", e); } } transaction.setStatus(Transaction.SUCCESS); transaction.complete(); } catch (Exception e) { } }
可以看到notify通知服务:
@RequestMapping("/mq/client/notify") public void notify(@RequestBody MsgNotifyRequest request) { if (isOpenFlag()) { Transaction transaction = Tracer.newTransaction("mq-client", "/mq/client/notify"); try { msgNotifyService.notify(request); transaction.setStatus(Transaction.SUCCESS); } catch (Exception e) { transaction.setStatus(e); } transaction.complete(); } }
找到notify:
@Override public void notify(MsgNotifyRequest request) { //消费poll操作服务 IConsumerPollingService consumerPollingService = MqClient.getMqFactory().createConsumerPollingService(); Map<String, IMqGroupExcutorService> groups = consumerPollingService.getMqExcutors(); if (groups != null && request != null && request.getMsgNotifyDtos() != null) { request.getMsgNotifyDtos().forEach(msgNotifyDto -> { if (groups.containsKey(msgNotifyDto.getConsumerGroupName())) { IMqGroupExcutorService iMqGroupExcutorService = groups.get(msgNotifyDto.getConsumerGroupName()); Map<Long, IMqQueueExcutorService> queues = iMqGroupExcutorService.getQueueEx(); if (queues.containsKey(msgNotifyDto.getQueueId())) { queues.get(msgNotifyDto.getQueueId()).notifyMsg(); } } }); } }
找到notifyMsg:终于我们知道了我们想看到的方法doPullingData,是不是很高兴看到这个方法
@Override public void notifyMsg() { createExecutorNotify(); if (System.currentTimeMillis() > lastPullTime) { executorNotify.submit(new Runnable() { @Override public void run() { doPullingData(); } }); } }
执行doPullingData:拉取消息,也即消费消息
protected boolean doPullingData() { if (pullFlag.compareAndSet(false, true)) { lastPullTime = System.currentTimeMillis(); ConsumerQueueDto consumerQueueDto = consumerQueueRef.get(); if (consumerQueueDto != null) { Transaction transaction = Tracer.newTransaction("mq-queue-pull", consumerQueueDto.getTopicName() + "-" + consumerQueueDto.getQueueId()); TraceMessageItem traceMessageItem = new TraceMessageItem(); try { request.setQueueId(consumerQueueDto.getQueueId()); if (checkOffsetVersion(consumerQueueDto)) { consumerQueueDto.setLastId(lastId); request.setOffsetStart(lastId); request.setOffsetEnd(lastId + consumerQueueDto.getPullBatchSize()); request.setConsumerGroupName(consumerQueueDto.getConsumerGroupName()); request.setTopicName(consumerQueueDto.getTopicName()); PullDataResponse response = null; //拉取数据pullData if (checkOffsetVersion(consumerQueueDto)) { response = mqResource.pullData(request); } // PullDataResponse response = null; traceMessageItem.status = "拉取消息正常lastid-" + lastId; traceMessageItem.msg = "当前拉取lastid为:" + lastId + ",end:" + (lastId + consumerQueueDto.getPullBatchSize()) + ",consumerGroupName:" + consumerQueueDto.getConsumerGroupName() + ",topicName:" + consumerQueueDto.getTopicName(); if (response != null && response.getMsgs() != null && response.getMsgs().size() > 0) { cacheData(response, consumerQueueDto); transaction.setStatus(Transaction.SUCCESS); return true; } } transaction.setStatus(Transaction.SUCCESS); } catch (Exception e) { traceMessageItem.status = "拉取消息失败"; traceMessageItem.msg = e.getMessage(); transaction.setStatus(e); } finally { traceMsgPull.add(traceMessageItem); transaction.complete(); //设置成false,此时拉取完成 pullFlag.set(false); } } return false; } else { return true; } }
拉取消息:
public PullDataResponse pullData(PullDataRequest request) { if (request == null) { return null; } //CONSUMERPRE="/api/client/consumer" //url = /api/client/consumer/pullData String url = MqConstanst.CONSUMERPRE + "/pullData"; try { return post(request, url, 2, PullDataResponse.class, true); } catch (Exception e) { CatRequest request2 = new CatRequest(); request2.setMethod("pullData"); request2.setJson(JsonUtil.toJson(request)); request2.setMsg(e.getMessage()); addCat(request2); throw e; } }
找到这个方法,拉取消息:
//拉取数据操作 @PostMapping("/pullData") public PullDataResponse pullData(@RequestBody PullDataRequest request) { PullDataResponse response = consumerService.pullData(request); return response; }
拉取消息:
//拉取数据,消费消息 @Override public PullDataResponse pullData(PullDataRequest request) { PullDataResponse response = new PullDataResponse(); response.setSuc(true); Map<Long, QueueEntity> data = queueService.getAllQueueMap(); checkVaild(request, response, data); if (!response.isSuc()) { return response; } //获取消息,通过队列id获取 QueueEntity temp = data.get(request.getQueueId()); Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache(); List<Message01Entity> entities = new ArrayList<>(); Transaction transaction = null; if (checkFailTime(request.getTopicName(), temp, null) && checkStatus(temp, dbNodeMap)) { //设置数据库id message01Service.setDbId(temp.getDbNodeId()); transaction = Tracer.newTransaction("Pull-Data", temp.getIp()); try { //获取消息,批量 entities = message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(), request.getOffsetEnd()); transaction.setStatus(Transaction.SUCCESS); dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L); } catch (Exception e) { transaction.setStatus(e); dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis()); // TODO: handle exception } } else { transaction = Tracer.newTransaction("PullData", "PullData-wait"); transaction.setStatus(Transaction.SUCCESS); } transaction.complete(); List<MessageDto> messageDtos = converMessageDto(entities); response.setMsgs(messageDtos); return response; }
找到获取消息重要方法:
//设置数据库id message01Service.setDbId(temp.getDbNodeId()); message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(), request.getOffsetEnd()); List<MessageDto> messageDtos = converMessageDto(entities); response.setMsgs(messageDtos);
本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲
原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。
原始发表时间:2021-01-09
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句