需要承认的是前面学习二中,pmq中的发送消息和消费消息是两个动作,同时操作的过程publish和pullData两个操作。认知有限,我误导大家了。接上上面的话题,发送消息publish的操作,前面已经说到基于HttpClient会执行post请求,里面有一个重要的url,这个是我们需要关注的。每一个url请求都是操作的开始。
过程:生产者发送消息publish->基于httpclient请求到mq-rest中的ConsumerController的publish,然后请求了publish,然后在消费者实现里面执行保存消息doSaveMsg,同时执行notifyClient操作,到notifyMsg,执行notify的过程中会执行doPollingData->pullData操作,请求到mq-rest,然后执行pullData操作。
其流程如下:
pmq流程图
pullData
public boolean publish(PublishMessageRequest request, int retryTimes) {
//CONSUMERPRE="/api/client/consumer" url是/api/client/consumer/publish
String url = MqConstanst.CONSUMERPRE + "/publish";
long start = System.nanoTime();
PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
}
找到这个url的请求,按照我们以前的习惯,一个请求过来,通常会请求到controller,因此我们可以找到:
mq-rest中的ConsumerController,它相当于一个中介,将数据存储和发送消息publish联系起来,在mq-biz中将消息放入分配好的写队列、主题队列,进行存储。
@RestController
@RequestMapping(MqConstanst.CONSUMERPRE)
public class ConsumerController {
//执行publish操作
@PostMapping("/publish")
public PublishMessageResponse publish(@RequestBody PublishMessageRequest request) {
setSubEnv(request);
//执行publish操作
PublishMessageResponse response = consumerService.publish(request);
return response;
}
此时可以在mq-biz中找到具体的实现:
此时思考一下,发送消息必须经过的流程,将消息进行存储,然后通知消费者,可以消费了。
//publish操作
@Override
public PublishMessageResponse publish(PublishMessageRequest request) {
PublishMessageResponse response = new PublishMessageResponse();
//校验
checkVaild(request, response);
if (!response.isSuc()) {
return response;
}
try {
if (!checkTopicRate(request, response)) {
return response;
}
//获取所有分配好的topic写队列
Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue();
//获取所以分配好的主题队列
Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue();
if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) {
List<QueueEntity> queueEntities = queueMap.get(request.getTopicName());
if (queueEntities == null || queueEntities.size() == 0) {
response.setSuc(false);
response.setMsg("topic_" + request.getTopicName() + "_and_has_no_queue!");
//如果主题队列map中包含请求中拿到主题名称,或者配置中拿到发布模式,则执行获取队列信息,同时更新队列缓存
if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) {
queueEntities = topicQueueMap.get(request.getTopicName());
updateQueueCache(request.getTopicName());
} else {
updateQueueCache(request.getTopicName());
return response;
}
}
//如果队列实体列表>0,则执行保存消息操作
if (queueEntities.size() > 0) {
saveMsg(request, response, queueEntities);
}
} else {
response.setSuc(false);
response.setMsg("topic1_" + request.getTopicName() + "_and_has_no_queue!");
return response;
}
} catch (Exception e) {
log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e);
response.setSuc(false);
response.setMsg(e.getMessage());
} finally {
//最终,将计数进行自减,同时获取
if (soaConfig.getEnableTopicRate() == 1) {
totalMax.decrementAndGet();
//获取主题
topicPerMax.get(request.getTopicName()).decrementAndGet();
}
}
return response;
}
saveMsg:保存消息,重点是doSaveMsg
//执行保存消息
protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request,
PublishMessageResponse response, QueueEntity temp) {
// Transaction transaction = Tracer.newTransaction("PubInner-" +
// temp.getIp(), request.getTopicName());
//消息服务设置数据库id
message01Service.setDbId(temp.getDbNodeId());
Transaction transaction = Tracer.newTransaction("Publish-Data", temp.getIp());
try {
transaction.addData("topic", request.getTopicName());
//执行批量插入
message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
// 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知
if (soaConfig.getMqPushFlag() == 1) {// apollo开关
//通知客户端
notifyClient(temp);
}
dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
response.setSuc(true);
transaction.setStatus(Transaction.SUCCESS);
return;
} catch (Exception e) {
// sendPublishFailMail(request, e, 1);
transaction.setStatus(e);
if (e instanceof DataIntegrityViolationException
|| e.getCause() instanceof DataIntegrityViolationException) {
response.setSuc(false);
response.setMsg(e.getMessage());
return;
}
dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
// transaction.setStatus(e);
throw new RuntimeException(e);
} finally {
transaction.complete();
}
}
重点看到:
message01Service.setDbId(temp.getDbNodeId());
//执行批量插入
message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
// 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知
if (soaConfig.getMqPushFlag() == 1) {// apollo开关
//通知客户端
notifyClient(temp);
}
这个是核心,发送消息,执行保存,将消息保存到数据库,同时通知客户端。
//通知客户端
public void notifyClient(QueueEntity queueEntity) {
try {
//队列偏移量的服务
Map<Long, List<QueueOffsetEntity>> queueIdQueueOffsetMap = queueOffsetService.getQueueIdQueueOffsetMap();
//获取缓存,消费队列map
Map<String, ConsumerGroupEntity> consumerGroupMap = consumerGroupService.getCache();
List<QueueOffsetEntity> queueOffsetList = queueIdQueueOffsetMap.get(queueEntity.getId());
if (queueOffsetList == null) {
return;
}
Map<String, List<MsgNotifyDto>> notifyMap = new HashMap<>();
for (QueueOffsetEntity queueOffset : queueOffsetList) {
// 如果消费者组开启了实时消息,则给对应的客户端发送异步通知。
if (consumerGroupMap.get(queueOffset.getConsumerGroupName()).getPushFlag() == 1
&& speedLimit(queueEntity.getId())) {
ConsumerUtil.ConsumerVo consumerVo = ConsumerUtil.parseConsumerId(queueOffset.getConsumerName());
if (StringUtils.isEmpty(consumerVo.port)) {
continue;
}
//客户端url 消费者vo里面有ip
String clienturl = "http://" + consumerVo.ip + ":" + consumerVo.port;
if (!notifyMap.containsKey(clienturl)) {
notifyMap.put(clienturl, new ArrayList<>());
}
MsgNotifyDto msgNotifyDto = new MsgNotifyDto();
msgNotifyDto.setConsumerGroupName(queueOffset.getConsumerGroupName());
msgNotifyDto.setQueueId(queueEntity.getId());
notifyMap.get(clienturl).add(msgNotifyDto);
}
}
if (notifyMap.size() == 0) {
return;
}
Transaction transaction = Tracer.newTransaction("mq-notify", "notifyClient");
speedLimitMapRef.get().put(queueEntity.getId(), System.currentTimeMillis());
for (String url : notifyMap.keySet()) {
// 给对应的客户端发送拉取通知
try {
MsgNotifyRequest request = new MsgNotifyRequest();
request.setMsgNotifyDtos(notifyMap.get(url));
//执行发送异步,如果通知失败,每隔5秒释放一个线程请求,去探测。
//重要
if (notifyFailTentativeLimit(url)) {
httpClient.postAsyn(url + "/mq/client/notify", request, new NotifyCallBack(url));
}
} catch (Exception e) {
log.error("给客户端发送拉取通知异常:", e);
}
}
transaction.setStatus(Transaction.SUCCESS);
transaction.complete();
} catch (Exception e) {
}
}
可以看到notify通知服务:
@RequestMapping("/mq/client/notify")
public void notify(@RequestBody MsgNotifyRequest request) {
if (isOpenFlag()) {
Transaction transaction = Tracer.newTransaction("mq-client", "/mq/client/notify");
try {
msgNotifyService.notify(request);
transaction.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
transaction.setStatus(e);
}
transaction.complete();
}
}
找到notify:
@Override
public void notify(MsgNotifyRequest request) {
//消费poll操作服务
IConsumerPollingService consumerPollingService = MqClient.getMqFactory().createConsumerPollingService();
Map<String, IMqGroupExcutorService> groups = consumerPollingService.getMqExcutors();
if (groups != null && request != null && request.getMsgNotifyDtos() != null) {
request.getMsgNotifyDtos().forEach(msgNotifyDto -> {
if (groups.containsKey(msgNotifyDto.getConsumerGroupName())) {
IMqGroupExcutorService iMqGroupExcutorService = groups.get(msgNotifyDto.getConsumerGroupName());
Map<Long, IMqQueueExcutorService> queues = iMqGroupExcutorService.getQueueEx();
if (queues.containsKey(msgNotifyDto.getQueueId())) {
queues.get(msgNotifyDto.getQueueId()).notifyMsg();
}
}
});
}
}
找到notifyMsg:终于我们知道了我们想看到的方法doPullingData,是不是很高兴看到这个方法
@Override
public void notifyMsg() {
createExecutorNotify();
if (System.currentTimeMillis() > lastPullTime) {
executorNotify.submit(new Runnable() {
@Override
public void run() {
doPullingData();
}
});
}
}
执行doPullingData:拉取消息,也即消费消息
protected boolean doPullingData() {
if (pullFlag.compareAndSet(false, true)) {
lastPullTime = System.currentTimeMillis();
ConsumerQueueDto consumerQueueDto = consumerQueueRef.get();
if (consumerQueueDto != null) {
Transaction transaction = Tracer.newTransaction("mq-queue-pull",
consumerQueueDto.getTopicName() + "-" + consumerQueueDto.getQueueId());
TraceMessageItem traceMessageItem = new TraceMessageItem();
try {
request.setQueueId(consumerQueueDto.getQueueId());
if (checkOffsetVersion(consumerQueueDto)) {
consumerQueueDto.setLastId(lastId);
request.setOffsetStart(lastId);
request.setOffsetEnd(lastId + consumerQueueDto.getPullBatchSize());
request.setConsumerGroupName(consumerQueueDto.getConsumerGroupName());
request.setTopicName(consumerQueueDto.getTopicName());
PullDataResponse response = null;
//拉取数据pullData
if (checkOffsetVersion(consumerQueueDto)) {
response = mqResource.pullData(request);
}
// PullDataResponse response = null;
traceMessageItem.status = "拉取消息正常lastid-" + lastId;
traceMessageItem.msg = "当前拉取lastid为:" + lastId + ",end:"
+ (lastId + consumerQueueDto.getPullBatchSize()) + ",consumerGroupName:"
+ consumerQueueDto.getConsumerGroupName() + ",topicName:"
+ consumerQueueDto.getTopicName();
if (response != null && response.getMsgs() != null && response.getMsgs().size() > 0) {
cacheData(response, consumerQueueDto);
transaction.setStatus(Transaction.SUCCESS);
return true;
}
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
traceMessageItem.status = "拉取消息失败";
traceMessageItem.msg = e.getMessage();
transaction.setStatus(e);
} finally {
traceMsgPull.add(traceMessageItem);
transaction.complete();
//设置成false,此时拉取完成
pullFlag.set(false);
}
}
return false;
} else {
return true;
}
}
拉取消息:
public PullDataResponse pullData(PullDataRequest request) {
if (request == null) {
return null;
}
//CONSUMERPRE="/api/client/consumer"
//url = /api/client/consumer/pullData
String url = MqConstanst.CONSUMERPRE + "/pullData";
try {
return post(request, url, 2, PullDataResponse.class, true);
} catch (Exception e) {
CatRequest request2 = new CatRequest();
request2.setMethod("pullData");
request2.setJson(JsonUtil.toJson(request));
request2.setMsg(e.getMessage());
addCat(request2);
throw e;
}
}
找到这个方法,拉取消息:
//拉取数据操作
@PostMapping("/pullData")
public PullDataResponse pullData(@RequestBody PullDataRequest request) {
PullDataResponse response = consumerService.pullData(request);
return response;
}
拉取消息:
//拉取数据,消费消息
@Override
public PullDataResponse pullData(PullDataRequest request) {
PullDataResponse response = new PullDataResponse();
response.setSuc(true);
Map<Long, QueueEntity> data = queueService.getAllQueueMap();
checkVaild(request, response, data);
if (!response.isSuc()) {
return response;
}
//获取消息,通过队列id获取
QueueEntity temp = data.get(request.getQueueId());
Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache();
List<Message01Entity> entities = new ArrayList<>();
Transaction transaction = null;
if (checkFailTime(request.getTopicName(), temp, null) && checkStatus(temp, dbNodeMap)) {
//设置数据库id
message01Service.setDbId(temp.getDbNodeId());
transaction = Tracer.newTransaction("Pull-Data", temp.getIp());
try {
//获取消息,批量
entities = message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(),
request.getOffsetEnd());
transaction.setStatus(Transaction.SUCCESS);
dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
} catch (Exception e) {
transaction.setStatus(e);
dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
// TODO: handle exception
}
} else {
transaction = Tracer.newTransaction("PullData", "PullData-wait");
transaction.setStatus(Transaction.SUCCESS);
}
transaction.complete();
List<MessageDto> messageDtos = converMessageDto(entities);
response.setMsgs(messageDtos);
return response;
}
找到获取消息重要方法:
//设置数据库id
message01Service.setDbId(temp.getDbNodeId());
message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(),
request.getOffsetEnd());
List<MessageDto> messageDtos = converMessageDto(entities);
response.setMsgs(messageDtos);