前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pmq学习二-生产者和消费者流程

pmq学习二-生产者和消费者流程

作者头像
路行的亚洲
发布2021-01-05 12:22:05
6390
发布2021-01-05 12:22:05
举报
文章被收录于专栏:后端技术学习后端技术学习

学习一个框架,通常从example开始。同时一个消息中间件是从生产者开始,消费者消费消息。这里mq-client-test-001里面的两个类开始。

可以看到生产者和消费者都是基于pulish接口进行请求的,同时publish请求的实质是调用httpClient的post请求,而在调用post请求时,需要考虑各种异常的请求和失败的请求。而异常请求,则会将异常信息放入到cat链路中,而放入到cat的过程中,也是基于httpClient调用post请求,放入的。同时请求失败,会进行失败重试,而重试的次数是10次。最终失败会发生失败邮件,进行预警。同时里面有一个mqContext、catContext。同时生产者和消费者属于客户端,broker是服务端,这个和RocketMQ是类似的。

其流程:

pmq发送/消费消息流程

下面的代码来源于信也开源的pmq。

消息生产服务:

代码语言:javascript
复制
/**
 * 消息生产者服务
 */
@RestController
public class TestController {
    @GetMapping("/test1")
    public void test1(@RequestParam String topicName, @RequestParam int count) {
        if (Util.isEmpty(topicName))
            return;
        //使用单例线程池提交任务
        Executors.newSingleThreadExecutor().submit(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i < count; i++) {
                    try {
                        //进行publish操作
                        MqClient.publish(topicName, "", new ProducerDataDto(String.valueOf(i)));
                    } catch (MqNotInitException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (ContentExceed65535Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    Util.sleep(10);
                }
            }
        });
    }

}

消息消费服务:

代码语言:javascript
复制
/**
 * 消息消费服务
 */
public class TestSub implements ISubscriber {
    @Override
    public List<Long> onMessageReceived(List<MessageDto> messages) {
        try {
            //接收消息,执行publish操作
            MqClient.publish("test2", null, new ProducerDataDto(messages.get(0).getBody()));
        } catch (MqNotInitException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ContentExceed65535Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return null;
    }

}

下面我们来看一下:也即生产者和消费者都会调用一个接口publish接口

代码语言:javascript
复制
  MqClient.publish(topicName, "", new ProducerDataDto(String.valueOf(i))); //生产消息
  MqClient.publish("test2", null, new ProducerDataDto(messages.get(0).getBody())); //消费消息

来看这两个方法,测试中的这两个方法都会调用publish方法:

代码语言:javascript
复制
public static boolean publish(String topic, String token, List<ProducerDataDto> messages,
      IPartitionSelector iPartitionSelector) throws MqNotInitException, ContentExceed65535Exception {
   // TODO Auto-generated method stub
    //如果没有初始化,则抛mq没有初始化异常
   if (!hasInit()) {
      throw new MqNotInitException();
   }
    //创建publishMessage请求对象
   PublishMessageRequest request = null;
    //对消息进行校验
   if (messages != null && messages.size() > 0) {
      for (ProducerDataDto t1 : messages) {
         if (MessageUtil.checkMessageExceed65535(t1.getBody())) {
            throw new ContentExceed65535Exception();
         }
         t1.setPartitionInfo(getPartitionId(topic, t1, iPartitionSelector));
      }
       //创建publishMessage请求对象,设置clientIp、token信息、topic名称、消息信息
      try {
         request = new PublishMessageRequest();
         request.setClientIp(mqContext.getConfig().getIp());
         request.setToken(token);
         request.setTopicName(topic);
         request.setMsgs(messages);
         checkBody(messages);
          //调用mq上下文,进行publish操作
         return mqContext.getMqResource().publish(request, mqContext.getConfig().getPbRetryTimes());
      } catch (Exception e) {
         log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e);
         return false;
      }
   }
   return true;
}

publish操作:

代码语言:javascript
复制
public boolean publish(PublishMessageRequest request, int retryTimes) {
   if (request == null) {
      return true;
   }
    //创建事务
   Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName());
   Timer.Context timer1 = MetricSingleton.getMetricRegistry()
         .timer("mq.client.publish.time?topic=" + request.getTopicName()).time();
   try {
      // http请求的url可以看到是/api/client/consumer/publish
      String url = MqConstanst.CONSUMERPRE + "/publish";
      long start = System.nanoTime();
       //使用httpClient执行post方法
      PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
      long end = System.nanoTime();
       //计算请求耗时
      if (response.getTime() > 0) {
         long t = end - start - response.getTime();
         t = (t - t % 1000000) / 1000000;
          //度量指标信息进行更新
         MetricSingleton.getMetricRegistry()
               .histogram("mq.client.publish.network.time?topic=" + request.getTopicName()).update(t);
      }
       //事务设置成功状态
      transaction.setStatus(Transaction.SUCCESS);
       //判断响应是否成功,如果成功,则直接返回,否则将请求设置为publish_fail,同时将器放入到cat链路追踪中
      if (!response.isSuc()) {
         String json = JsonUtil.toJson(request);
         logger.error(response.getMsg());
         CatRequest request2 = new CatRequest();
         request2.setMethod("publish_fail");
         request2.setJson(json);
         request2.setMsg(response.getMsg());
         addCat(request2);
      }
       //响应设置为success
      return response.isSuc();
    //否者,说明请求出现异常,此时需要将请求失败的度量信息放入,同时将失败信息放入到cat链路追踪中   
   } catch (Exception e) {
      MetricSingleton.getMetricRegistry().counter("mq.client.publish.fail.count?topic=" + request.getTopicName())
            .inc();
      logger.error("publish_error", e);
      String json = JsonUtil.toJson(request);
      transaction.setStatus(e);
      CatRequest request2 = new CatRequest();
      request2.setMethod("publish");
      request2.setJson(json);
      request2.setMsg(e.getMessage());
      addCat(request2);

       //发送失败邮件异常信息
      SendMailRequest mailRequest = new SendMailRequest();
      mailRequest.setSubject("消息发送失败,客户端:" + request.getClientIp() + ",Topic:" + request.getTopicName());
      mailRequest.setContent("消息发送异常," + ",消息体是:" + json + ",异常原因是:" + e.getMessage());
      mailRequest.setType(2);
      mailRequest.setTopicName(request.getTopicName());
      sendMail(mailRequest);
      return false;
   } finally {
      //最终将事务设置为完成 
      transaction.complete();
      timer1.stop();
   }
}

执行post请求:

代码语言:javascript
复制
protected <T> T post(Object request, String path, int tryCount, Class<T> class1, boolean isImportant) {
   T response = null;
   int count = 0;
   Exception last = null;
   String url = null;
    //如果响应为空,同时请求此时<尝试次数10次=tryCount
   while (response == null && count < tryCount) {
     //获取host  
      String host = getHost(isImportant);
      url = host + path;
      try {
          //进行post请求
         response = httpClient.post(url, request, class1);
         last = null;
      //异常处理:如果此时的url中不存在包含的信息,则访问url异常
       //添加异常错误信息到cat中 ,发送httpClient的post请求    
      } catch (IOException e) {
         if (!(url.indexOf(MqConstanst.CONSUMERPRE + "/heartbeat") != -1
               || url.indexOf(MqConstanst.CONSUMERPRE + "/getMetaGroup") != -1)) {
            logger.error("访问" + url + "异常,access_error", e);
         }
         addErrorCat(e, request, count, tryCount);
         last = e;
      } catch (BrokerException e) {
         last = e;
         addErrorCat(e, request, count, tryCount);
      } catch (Exception e) {
         last = e;
         addErrorCat(e, request, count, tryCount);
      } finally {
          //如果响应不为空,则判断是否是import导入,如果是则放入失败的请求url
         if (response != null) {
            if (isImportant) {
               failUrlG1.put(host, System.currentTimeMillis() - 10 * 1000);
            } else {
               failUrlG2.put(host, System.currentTimeMillis() - 10 * 1000);
            }
             //如果响应是publishMessageResponse实例,则进行降速,计数自减
            if (response instanceof PublishMessageResponse) {
               PublishMessageResponse response2 = ((PublishMessageResponse) response);
               if (response2.getSleepTime() > 0) {
                  response = null;
                  logger.info(response2.getMsg());
                  Util.sleep(response2.getSleepTime());
                  // 这个不算重试,只是降速
                  count--;
               }
            } else {
               BaseResponse baseResponse = (BaseResponse) response;
               if (!baseResponse.isSuc() && baseResponse.getCode() == MqConstanst.NO) {
                  response = null;
                  Util.sleep(1000);
               } else {
                  if (!baseResponse.isSuc()) {
                     logger.error(baseResponse.getMsg());
                  }
               }
            }
         } else {
            // response 等于null 说明接口调用失败了。此时需要将url 放入失败接口中。
            if (isImportant) {
               failUrlG1.put(host, System.currentTimeMillis());
            } else {
               failUrlG2.put(host, System.currentTimeMillis());
            }
            Util.sleep(500);
         }
      }
       //计数++
      count++;
   }
   if (last != null) {          
      throw new RuntimeException(last);
   }
   return response;
}

调用post:

代码语言:javascript
复制
public String post(String url, Object reqObj) throws IOException,BrokerException {
   String json = "";
   if (reqObj != null) { 
      json = JsonUtil.toJsonNull(reqObj);
   }
   Response response = null;
    //事务
   Transaction transaction = Tracer.newTransaction("mq-http", url);
   try {
      RequestBody body = RequestBody.create(JSONTYPE, json);
      Request.Builder requestbuilder = new Request.Builder().url(url).post(body);
       //cat上下文
      CatContext catContext=Tracer.logRemoteCallClient();
      if(catContext!=null){
         requestbuilder.addHeader(CatContext.CHILD, catContext.getProperty(CatContext.CHILD));
         requestbuilder.addHeader(CatContext.PARENT, catContext.getProperty(CatContext.PARENT));
         requestbuilder.addHeader(CatContext.ROOT, catContext.getProperty(CatContext.ROOT));
      }
       //进行请求
      Request request=requestbuilder.build();
      response = client.newCall(request).execute();  
      //设置事务成功状态 
      transaction.setStatus(Transaction.SUCCESS);          
      if (response.isSuccessful()) {
         return response.body().string();
      } else {
         BrokerException exception = new BrokerException(
               response.code() + " error,and message is " + response.message()+",json is "+json);
         throw exception;
      }
   } catch (IOException e) {
      transaction.setStatus(e);
      throw e;
   }catch (Exception e) {
      transaction.setStatus(e);
      throw e;
   }
   finally {
      transaction.complete();
      try {
         if (response != null) {
            response.close();
         }
      } catch (Exception e) {

      }
   }
}

也即从这里可以看出其请求最终会调用okHttp发送post请求,完成请求和响应。可以看到对于错误和异常的处理做得是非常细致的。下一篇来看pmq是怎么整合spring、启动服务端和客户端的。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档