专栏首页后端技术学习pmq学习二-生产者和消费者流程

pmq学习二-生产者和消费者流程

学习一个框架,通常从example开始。同时一个消息中间件是从生产者开始,消费者消费消息。这里mq-client-test-001里面的两个类开始。

可以看到生产者和消费者都是基于pulish接口进行请求的,同时publish请求的实质是调用httpClient的post请求,而在调用post请求时,需要考虑各种异常的请求和失败的请求。而异常请求,则会将异常信息放入到cat链路中,而放入到cat的过程中,也是基于httpClient调用post请求,放入的。同时请求失败,会进行失败重试,而重试的次数是10次。最终失败会发生失败邮件,进行预警。同时里面有一个mqContext、catContext。同时生产者和消费者属于客户端,broker是服务端,这个和RocketMQ是类似的。

其流程:

pmq发送/消费消息流程

下面的代码来源于信也开源的pmq。

消息生产服务:

/**
 * 消息生产者服务
 */
@RestController
public class TestController {
    @GetMapping("/test1")
    public void test1(@RequestParam String topicName, @RequestParam int count) {
        if (Util.isEmpty(topicName))
            return;
        //使用单例线程池提交任务
        Executors.newSingleThreadExecutor().submit(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i < count; i++) {
                    try {
                        //进行publish操作
                        MqClient.publish(topicName, "", new ProducerDataDto(String.valueOf(i)));
                    } catch (MqNotInitException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (ContentExceed65535Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    Util.sleep(10);
                }
            }
        });
    }

}

消息消费服务:

/**
 * 消息消费服务
 */
public class TestSub implements ISubscriber {
    @Override
    public List<Long> onMessageReceived(List<MessageDto> messages) {
        try {
            //接收消息,执行publish操作
            MqClient.publish("test2", null, new ProducerDataDto(messages.get(0).getBody()));
        } catch (MqNotInitException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ContentExceed65535Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return null;
    }

}

下面我们来看一下:也即生产者和消费者都会调用一个接口publish接口

  MqClient.publish(topicName, "", new ProducerDataDto(String.valueOf(i))); //生产消息
  MqClient.publish("test2", null, new ProducerDataDto(messages.get(0).getBody())); //消费消息

来看这两个方法,测试中的这两个方法都会调用publish方法:

public static boolean publish(String topic, String token, List<ProducerDataDto> messages,
      IPartitionSelector iPartitionSelector) throws MqNotInitException, ContentExceed65535Exception {
   // TODO Auto-generated method stub
    //如果没有初始化,则抛mq没有初始化异常
   if (!hasInit()) {
      throw new MqNotInitException();
   }
    //创建publishMessage请求对象
   PublishMessageRequest request = null;
    //对消息进行校验
   if (messages != null && messages.size() > 0) {
      for (ProducerDataDto t1 : messages) {
         if (MessageUtil.checkMessageExceed65535(t1.getBody())) {
            throw new ContentExceed65535Exception();
         }
         t1.setPartitionInfo(getPartitionId(topic, t1, iPartitionSelector));
      }
       //创建publishMessage请求对象,设置clientIp、token信息、topic名称、消息信息
      try {
         request = new PublishMessageRequest();
         request.setClientIp(mqContext.getConfig().getIp());
         request.setToken(token);
         request.setTopicName(topic);
         request.setMsgs(messages);
         checkBody(messages);
          //调用mq上下文,进行publish操作
         return mqContext.getMqResource().publish(request, mqContext.getConfig().getPbRetryTimes());
      } catch (Exception e) {
         log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e);
         return false;
      }
   }
   return true;
}

publish操作:

public boolean publish(PublishMessageRequest request, int retryTimes) {
   if (request == null) {
      return true;
   }
    //创建事务
   Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName());
   Timer.Context timer1 = MetricSingleton.getMetricRegistry()
         .timer("mq.client.publish.time?topic=" + request.getTopicName()).time();
   try {
      // http请求的url可以看到是/api/client/consumer/publish
      String url = MqConstanst.CONSUMERPRE + "/publish";
      long start = System.nanoTime();
       //使用httpClient执行post方法
      PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
      long end = System.nanoTime();
       //计算请求耗时
      if (response.getTime() > 0) {
         long t = end - start - response.getTime();
         t = (t - t % 1000000) / 1000000;
          //度量指标信息进行更新
         MetricSingleton.getMetricRegistry()
               .histogram("mq.client.publish.network.time?topic=" + request.getTopicName()).update(t);
      }
       //事务设置成功状态
      transaction.setStatus(Transaction.SUCCESS);
       //判断响应是否成功,如果成功,则直接返回,否则将请求设置为publish_fail,同时将器放入到cat链路追踪中
      if (!response.isSuc()) {
         String json = JsonUtil.toJson(request);
         logger.error(response.getMsg());
         CatRequest request2 = new CatRequest();
         request2.setMethod("publish_fail");
         request2.setJson(json);
         request2.setMsg(response.getMsg());
         addCat(request2);
      }
       //响应设置为success
      return response.isSuc();
    //否者,说明请求出现异常,此时需要将请求失败的度量信息放入,同时将失败信息放入到cat链路追踪中   
   } catch (Exception e) {
      MetricSingleton.getMetricRegistry().counter("mq.client.publish.fail.count?topic=" + request.getTopicName())
            .inc();
      logger.error("publish_error", e);
      String json = JsonUtil.toJson(request);
      transaction.setStatus(e);
      CatRequest request2 = new CatRequest();
      request2.setMethod("publish");
      request2.setJson(json);
      request2.setMsg(e.getMessage());
      addCat(request2);

       //发送失败邮件异常信息
      SendMailRequest mailRequest = new SendMailRequest();
      mailRequest.setSubject("消息发送失败,客户端:" + request.getClientIp() + ",Topic:" + request.getTopicName());
      mailRequest.setContent("消息发送异常," + ",消息体是:" + json + ",异常原因是:" + e.getMessage());
      mailRequest.setType(2);
      mailRequest.setTopicName(request.getTopicName());
      sendMail(mailRequest);
      return false;
   } finally {
      //最终将事务设置为完成 
      transaction.complete();
      timer1.stop();
   }
}

执行post请求:

protected <T> T post(Object request, String path, int tryCount, Class<T> class1, boolean isImportant) {
   T response = null;
   int count = 0;
   Exception last = null;
   String url = null;
    //如果响应为空,同时请求此时<尝试次数10次=tryCount
   while (response == null && count < tryCount) {
     //获取host  
      String host = getHost(isImportant);
      url = host + path;
      try {
          //进行post请求
         response = httpClient.post(url, request, class1);
         last = null;
      //异常处理:如果此时的url中不存在包含的信息,则访问url异常
       //添加异常错误信息到cat中 ,发送httpClient的post请求    
      } catch (IOException e) {
         if (!(url.indexOf(MqConstanst.CONSUMERPRE + "/heartbeat") != -1
               || url.indexOf(MqConstanst.CONSUMERPRE + "/getMetaGroup") != -1)) {
            logger.error("访问" + url + "异常,access_error", e);
         }
         addErrorCat(e, request, count, tryCount);
         last = e;
      } catch (BrokerException e) {
         last = e;
         addErrorCat(e, request, count, tryCount);
      } catch (Exception e) {
         last = e;
         addErrorCat(e, request, count, tryCount);
      } finally {
          //如果响应不为空,则判断是否是import导入,如果是则放入失败的请求url
         if (response != null) {
            if (isImportant) {
               failUrlG1.put(host, System.currentTimeMillis() - 10 * 1000);
            } else {
               failUrlG2.put(host, System.currentTimeMillis() - 10 * 1000);
            }
             //如果响应是publishMessageResponse实例,则进行降速,计数自减
            if (response instanceof PublishMessageResponse) {
               PublishMessageResponse response2 = ((PublishMessageResponse) response);
               if (response2.getSleepTime() > 0) {
                  response = null;
                  logger.info(response2.getMsg());
                  Util.sleep(response2.getSleepTime());
                  // 这个不算重试,只是降速
                  count--;
               }
            } else {
               BaseResponse baseResponse = (BaseResponse) response;
               if (!baseResponse.isSuc() && baseResponse.getCode() == MqConstanst.NO) {
                  response = null;
                  Util.sleep(1000);
               } else {
                  if (!baseResponse.isSuc()) {
                     logger.error(baseResponse.getMsg());
                  }
               }
            }
         } else {
            // response 等于null 说明接口调用失败了。此时需要将url 放入失败接口中。
            if (isImportant) {
               failUrlG1.put(host, System.currentTimeMillis());
            } else {
               failUrlG2.put(host, System.currentTimeMillis());
            }
            Util.sleep(500);
         }
      }
       //计数++
      count++;
   }
   if (last != null) {          
      throw new RuntimeException(last);
   }
   return response;
}

调用post:

public String post(String url, Object reqObj) throws IOException,BrokerException {
   String json = "";
   if (reqObj != null) { 
      json = JsonUtil.toJsonNull(reqObj);
   }
   Response response = null;
    //事务
   Transaction transaction = Tracer.newTransaction("mq-http", url);
   try {
      RequestBody body = RequestBody.create(JSONTYPE, json);
      Request.Builder requestbuilder = new Request.Builder().url(url).post(body);
       //cat上下文
      CatContext catContext=Tracer.logRemoteCallClient();
      if(catContext!=null){
         requestbuilder.addHeader(CatContext.CHILD, catContext.getProperty(CatContext.CHILD));
         requestbuilder.addHeader(CatContext.PARENT, catContext.getProperty(CatContext.PARENT));
         requestbuilder.addHeader(CatContext.ROOT, catContext.getProperty(CatContext.ROOT));
      }
       //进行请求
      Request request=requestbuilder.build();
      response = client.newCall(request).execute();  
      //设置事务成功状态 
      transaction.setStatus(Transaction.SUCCESS);          
      if (response.isSuccessful()) {
         return response.body().string();
      } else {
         BrokerException exception = new BrokerException(
               response.code() + " error,and message is " + response.message()+",json is "+json);
         throw exception;
      }
   } catch (IOException e) {
      transaction.setStatus(e);
      throw e;
   }catch (Exception e) {
      transaction.setStatus(e);
      throw e;
   }
   finally {
      transaction.complete();
      try {
         if (response != null) {
            response.close();
         }
      } catch (Exception e) {

      }
   }
}

也即从这里可以看出其请求最终会调用okHttp发送post请求,完成请求和响应。可以看到对于错误和异常的处理做得是非常细致的。下一篇来看pmq是怎么整合spring、启动服务端和客户端的。

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-12-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RocketMQ学习5

    进行消息发送的过程首先会准备好路由信息,最终是由netty完成的,也即使用nettyRemotingClient来实现的。

    路行的亚洲
  • ConcurrentHashMap源码学习

    既然有了HashMap为什么还会出现ConcurrentHashMap?同时ConcurrentHashMap具有什么优势?ConcurrentHashMap与...

    路行的亚洲
  • pmq学习总结

    进行重平衡会做的一个操作:通知重平衡,而过程中会更新重平衡的版本,同时设置通知的消息,同时将通知的消息插入到通知消息服务中,也即会插入到通知消息表中。而在生产者...

    路行的亚洲
  • Spring MVC请求处理过程。你这样回答保证通过面试!

    SpringMVC请求处理相信大家都很熟悉了,本篇主要是基于SpringMVC处理请求的流程来阅读并调试源码,以及解决几个仅靠流程图无法解释的问题。

    程序员白楠楠
  • BlockCanary源码解析

    如上代码中的loop()方法是Looper中的,我们的目的是监测主线程的卡顿问题,因为UI更新界面都是在主线程中进行的,所以在主线程中做耗时操作可能会造成界面卡...

    大大大大大先生
  • golang 设置 http response 响应头与坑

    之前遇到个问题,在一段代码中这样设置WriteHeader,最后在header中取Name时怎么也取不到。

    我的小碗汤
  • golang 设置 http response 响应头与坑

    之前遇到个问题,在一段代码中这样设置WriteHeader,最后在header中取Name时怎么也取不到。

    我的小碗汤
  • 建立一个跨平台可复用C++代码的实例工程(二)windows,android下webview中js调用原生代码统一接口

    三端界面统一用webview加载h5实现,所以需要统一定义一个js调用原生代码的接口。其中windows方面比较好实现,用CefV8Handler,OnWebK...

    xiny120
  • Code | Python30个编程技巧!

    1. 原地交换两个数字 Python 提供了一个直观的在一行代码中赋值与交换(变量值)的方法,请参见下面的示例: ? 3. 使用三元操作符来进行条件赋值 三元...

    IT派
  • 安卓开发之mqtt协议实例代码

    首先物联网协议mqtt协议是基于tcp/ip协议的,使用了官方的mqttclient框架

    砸漏

扫码关注云+社区

领取腾讯云代金券