学习一个框架,通常从example开始。同时一个消息中间件是从生产者开始,消费者消费消息。这里mq-client-test-001里面的两个类开始。
可以看到生产者和消费者都是基于pulish接口进行请求的,同时publish请求的实质是调用httpClient的post请求,而在调用post请求时,需要考虑各种异常的请求和失败的请求。而异常请求,则会将异常信息放入到cat链路中,而放入到cat的过程中,也是基于httpClient调用post请求,放入的。同时请求失败,会进行失败重试,而重试的次数是10次。最终失败会发生失败邮件,进行预警。同时里面有一个mqContext、catContext。同时生产者和消费者属于客户端,broker是服务端,这个和RocketMQ是类似的。
其流程:
pmq发送/消费消息流程
下面的代码来源于信也开源的pmq。
消息生产服务:
/** * 消息生产者服务 */ @RestController public class TestController { @GetMapping("/test1") public void test1(@RequestParam String topicName, @RequestParam int count) { if (Util.isEmpty(topicName)) return; //使用单例线程池提交任务 Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { for (int i = 1; i < count; i++) { try { //进行publish操作 MqClient.publish(topicName, "", new ProducerDataDto(String.valueOf(i))); } catch (MqNotInitException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ContentExceed65535Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } Util.sleep(10); } } }); } }
消息消费服务:
/** * 消息消费服务 */ public class TestSub implements ISubscriber { @Override public List<Long> onMessageReceived(List<MessageDto> messages) { try { //接收消息,执行publish操作 MqClient.publish("test2", null, new ProducerDataDto(messages.get(0).getBody())); } catch (MqNotInitException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ContentExceed65535Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }
下面我们来看一下:也即生产者和消费者都会调用一个接口publish接口
MqClient.publish(topicName, "", new ProducerDataDto(String.valueOf(i))); //生产消息 MqClient.publish("test2", null, new ProducerDataDto(messages.get(0).getBody())); //消费消息
来看这两个方法,测试中的这两个方法都会调用publish方法:
public static boolean publish(String topic, String token, List<ProducerDataDto> messages, IPartitionSelector iPartitionSelector) throws MqNotInitException, ContentExceed65535Exception { // TODO Auto-generated method stub //如果没有初始化,则抛mq没有初始化异常 if (!hasInit()) { throw new MqNotInitException(); } //创建publishMessage请求对象 PublishMessageRequest request = null; //对消息进行校验 if (messages != null && messages.size() > 0) { for (ProducerDataDto t1 : messages) { if (MessageUtil.checkMessageExceed65535(t1.getBody())) { throw new ContentExceed65535Exception(); } t1.setPartitionInfo(getPartitionId(topic, t1, iPartitionSelector)); } //创建publishMessage请求对象,设置clientIp、token信息、topic名称、消息信息 try { request = new PublishMessageRequest(); request.setClientIp(mqContext.getConfig().getIp()); request.setToken(token); request.setTopicName(topic); request.setMsgs(messages); checkBody(messages); //调用mq上下文,进行publish操作 return mqContext.getMqResource().publish(request, mqContext.getConfig().getPbRetryTimes()); } catch (Exception e) { log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e); return false; } } return true; }
publish操作:
public boolean publish(PublishMessageRequest request, int retryTimes) { if (request == null) { return true; } //创建事务 Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName()); Timer.Context timer1 = MetricSingleton.getMetricRegistry() .timer("mq.client.publish.time?topic=" + request.getTopicName()).time(); try { // http请求的url可以看到是/api/client/consumer/publish String url = MqConstanst.CONSUMERPRE + "/publish"; long start = System.nanoTime(); //使用httpClient执行post方法 PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true); long end = System.nanoTime(); //计算请求耗时 if (response.getTime() > 0) { long t = end - start - response.getTime(); t = (t - t % 1000000) / 1000000; //度量指标信息进行更新 MetricSingleton.getMetricRegistry() .histogram("mq.client.publish.network.time?topic=" + request.getTopicName()).update(t); } //事务设置成功状态 transaction.setStatus(Transaction.SUCCESS); //判断响应是否成功,如果成功,则直接返回,否则将请求设置为publish_fail,同时将器放入到cat链路追踪中 if (!response.isSuc()) { String json = JsonUtil.toJson(request); logger.error(response.getMsg()); CatRequest request2 = new CatRequest(); request2.setMethod("publish_fail"); request2.setJson(json); request2.setMsg(response.getMsg()); addCat(request2); } //响应设置为success return response.isSuc(); //否者,说明请求出现异常,此时需要将请求失败的度量信息放入,同时将失败信息放入到cat链路追踪中 } catch (Exception e) { MetricSingleton.getMetricRegistry().counter("mq.client.publish.fail.count?topic=" + request.getTopicName()) .inc(); logger.error("publish_error", e); String json = JsonUtil.toJson(request); transaction.setStatus(e); CatRequest request2 = new CatRequest(); request2.setMethod("publish"); request2.setJson(json); request2.setMsg(e.getMessage()); addCat(request2); //发送失败邮件异常信息 SendMailRequest mailRequest = new SendMailRequest(); mailRequest.setSubject("消息发送失败,客户端:" + request.getClientIp() + ",Topic:" + request.getTopicName()); mailRequest.setContent("消息发送异常," + ",消息体是:" + json + ",异常原因是:" + e.getMessage()); mailRequest.setType(2); mailRequest.setTopicName(request.getTopicName()); sendMail(mailRequest); return false; } finally { //最终将事务设置为完成 transaction.complete(); timer1.stop(); } }
执行post请求:
protected <T> T post(Object request, String path, int tryCount, Class<T> class1, boolean isImportant) { T response = null; int count = 0; Exception last = null; String url = null; //如果响应为空,同时请求此时<尝试次数10次=tryCount while (response == null && count < tryCount) { //获取host String host = getHost(isImportant); url = host + path; try { //进行post请求 response = httpClient.post(url, request, class1); last = null; //异常处理:如果此时的url中不存在包含的信息,则访问url异常 //添加异常错误信息到cat中 ,发送httpClient的post请求 } catch (IOException e) { if (!(url.indexOf(MqConstanst.CONSUMERPRE + "/heartbeat") != -1 || url.indexOf(MqConstanst.CONSUMERPRE + "/getMetaGroup") != -1)) { logger.error("访问" + url + "异常,access_error", e); } addErrorCat(e, request, count, tryCount); last = e; } catch (BrokerException e) { last = e; addErrorCat(e, request, count, tryCount); } catch (Exception e) { last = e; addErrorCat(e, request, count, tryCount); } finally { //如果响应不为空,则判断是否是import导入,如果是则放入失败的请求url if (response != null) { if (isImportant) { failUrlG1.put(host, System.currentTimeMillis() - 10 * 1000); } else { failUrlG2.put(host, System.currentTimeMillis() - 10 * 1000); } //如果响应是publishMessageResponse实例,则进行降速,计数自减 if (response instanceof PublishMessageResponse) { PublishMessageResponse response2 = ((PublishMessageResponse) response); if (response2.getSleepTime() > 0) { response = null; logger.info(response2.getMsg()); Util.sleep(response2.getSleepTime()); // 这个不算重试,只是降速 count--; } } else { BaseResponse baseResponse = (BaseResponse) response; if (!baseResponse.isSuc() && baseResponse.getCode() == MqConstanst.NO) { response = null; Util.sleep(1000); } else { if (!baseResponse.isSuc()) { logger.error(baseResponse.getMsg()); } } } } else { // response 等于null 说明接口调用失败了。此时需要将url 放入失败接口中。 if (isImportant) { failUrlG1.put(host, System.currentTimeMillis()); } else { failUrlG2.put(host, System.currentTimeMillis()); } Util.sleep(500); } } //计数++ count++; } if (last != null) { throw new RuntimeException(last); } return response; }
调用post:
public String post(String url, Object reqObj) throws IOException,BrokerException { String json = ""; if (reqObj != null) { json = JsonUtil.toJsonNull(reqObj); } Response response = null; //事务 Transaction transaction = Tracer.newTransaction("mq-http", url); try { RequestBody body = RequestBody.create(JSONTYPE, json); Request.Builder requestbuilder = new Request.Builder().url(url).post(body); //cat上下文 CatContext catContext=Tracer.logRemoteCallClient(); if(catContext!=null){ requestbuilder.addHeader(CatContext.CHILD, catContext.getProperty(CatContext.CHILD)); requestbuilder.addHeader(CatContext.PARENT, catContext.getProperty(CatContext.PARENT)); requestbuilder.addHeader(CatContext.ROOT, catContext.getProperty(CatContext.ROOT)); } //进行请求 Request request=requestbuilder.build(); response = client.newCall(request).execute(); //设置事务成功状态 transaction.setStatus(Transaction.SUCCESS); if (response.isSuccessful()) { return response.body().string(); } else { BrokerException exception = new BrokerException( response.code() + " error,and message is " + response.message()+",json is "+json); throw exception; } } catch (IOException e) { transaction.setStatus(e); throw e; }catch (Exception e) { transaction.setStatus(e); throw e; } finally { transaction.complete(); try { if (response != null) { response.close(); } } catch (Exception e) { } } }
也即从这里可以看出其请求最终会调用okHttp发送post请求,完成请求和响应。可以看到对于错误和异常的处理做得是非常细致的。下一篇来看pmq是怎么整合spring、启动服务端和客户端的。
本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲
原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。
原始发表时间:2020-12-26
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句