学习一个框架,通常从example开始。同时一个消息中间件是从生产者开始,消费者消费消息。这里mq-client-test-001里面的两个类开始。
可以看到生产者和消费者都是基于pulish接口进行请求的,同时publish请求的实质是调用httpClient的post请求,而在调用post请求时,需要考虑各种异常的请求和失败的请求。而异常请求,则会将异常信息放入到cat链路中,而放入到cat的过程中,也是基于httpClient调用post请求,放入的。同时请求失败,会进行失败重试,而重试的次数是10次。最终失败会发生失败邮件,进行预警。同时里面有一个mqContext、catContext。同时生产者和消费者属于客户端,broker是服务端,这个和RocketMQ是类似的。
其流程:
pmq发送/消费消息流程
下面的代码来源于信也开源的pmq。
消息生产服务:
/**
* 消息生产者服务
*/
@RestController
public class TestController {
@GetMapping("/test1")
public void test1(@RequestParam String topicName, @RequestParam int count) {
if (Util.isEmpty(topicName))
return;
//使用单例线程池提交任务
Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
for (int i = 1; i < count; i++) {
try {
//进行publish操作
MqClient.publish(topicName, "", new ProducerDataDto(String.valueOf(i)));
} catch (MqNotInitException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ContentExceed65535Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Util.sleep(10);
}
}
});
}
}
消息消费服务:
/**
* 消息消费服务
*/
public class TestSub implements ISubscriber {
@Override
public List<Long> onMessageReceived(List<MessageDto> messages) {
try {
//接收消息,执行publish操作
MqClient.publish("test2", null, new ProducerDataDto(messages.get(0).getBody()));
} catch (MqNotInitException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ContentExceed65535Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
}
下面我们来看一下:也即生产者和消费者都会调用一个接口publish接口
MqClient.publish(topicName, "", new ProducerDataDto(String.valueOf(i))); //生产消息
MqClient.publish("test2", null, new ProducerDataDto(messages.get(0).getBody())); //消费消息
来看这两个方法,测试中的这两个方法都会调用publish方法:
public static boolean publish(String topic, String token, List<ProducerDataDto> messages,
IPartitionSelector iPartitionSelector) throws MqNotInitException, ContentExceed65535Exception {
// TODO Auto-generated method stub
//如果没有初始化,则抛mq没有初始化异常
if (!hasInit()) {
throw new MqNotInitException();
}
//创建publishMessage请求对象
PublishMessageRequest request = null;
//对消息进行校验
if (messages != null && messages.size() > 0) {
for (ProducerDataDto t1 : messages) {
if (MessageUtil.checkMessageExceed65535(t1.getBody())) {
throw new ContentExceed65535Exception();
}
t1.setPartitionInfo(getPartitionId(topic, t1, iPartitionSelector));
}
//创建publishMessage请求对象,设置clientIp、token信息、topic名称、消息信息
try {
request = new PublishMessageRequest();
request.setClientIp(mqContext.getConfig().getIp());
request.setToken(token);
request.setTopicName(topic);
request.setMsgs(messages);
checkBody(messages);
//调用mq上下文,进行publish操作
return mqContext.getMqResource().publish(request, mqContext.getConfig().getPbRetryTimes());
} catch (Exception e) {
log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e);
return false;
}
}
return true;
}
publish操作:
public boolean publish(PublishMessageRequest request, int retryTimes) {
if (request == null) {
return true;
}
//创建事务
Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName());
Timer.Context timer1 = MetricSingleton.getMetricRegistry()
.timer("mq.client.publish.time?topic=" + request.getTopicName()).time();
try {
// http请求的url可以看到是/api/client/consumer/publish
String url = MqConstanst.CONSUMERPRE + "/publish";
long start = System.nanoTime();
//使用httpClient执行post方法
PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
long end = System.nanoTime();
//计算请求耗时
if (response.getTime() > 0) {
long t = end - start - response.getTime();
t = (t - t % 1000000) / 1000000;
//度量指标信息进行更新
MetricSingleton.getMetricRegistry()
.histogram("mq.client.publish.network.time?topic=" + request.getTopicName()).update(t);
}
//事务设置成功状态
transaction.setStatus(Transaction.SUCCESS);
//判断响应是否成功,如果成功,则直接返回,否则将请求设置为publish_fail,同时将器放入到cat链路追踪中
if (!response.isSuc()) {
String json = JsonUtil.toJson(request);
logger.error(response.getMsg());
CatRequest request2 = new CatRequest();
request2.setMethod("publish_fail");
request2.setJson(json);
request2.setMsg(response.getMsg());
addCat(request2);
}
//响应设置为success
return response.isSuc();
//否者,说明请求出现异常,此时需要将请求失败的度量信息放入,同时将失败信息放入到cat链路追踪中
} catch (Exception e) {
MetricSingleton.getMetricRegistry().counter("mq.client.publish.fail.count?topic=" + request.getTopicName())
.inc();
logger.error("publish_error", e);
String json = JsonUtil.toJson(request);
transaction.setStatus(e);
CatRequest request2 = new CatRequest();
request2.setMethod("publish");
request2.setJson(json);
request2.setMsg(e.getMessage());
addCat(request2);
//发送失败邮件异常信息
SendMailRequest mailRequest = new SendMailRequest();
mailRequest.setSubject("消息发送失败,客户端:" + request.getClientIp() + ",Topic:" + request.getTopicName());
mailRequest.setContent("消息发送异常," + ",消息体是:" + json + ",异常原因是:" + e.getMessage());
mailRequest.setType(2);
mailRequest.setTopicName(request.getTopicName());
sendMail(mailRequest);
return false;
} finally {
//最终将事务设置为完成
transaction.complete();
timer1.stop();
}
}
执行post请求:
protected <T> T post(Object request, String path, int tryCount, Class<T> class1, boolean isImportant) {
T response = null;
int count = 0;
Exception last = null;
String url = null;
//如果响应为空,同时请求此时<尝试次数10次=tryCount
while (response == null && count < tryCount) {
//获取host
String host = getHost(isImportant);
url = host + path;
try {
//进行post请求
response = httpClient.post(url, request, class1);
last = null;
//异常处理:如果此时的url中不存在包含的信息,则访问url异常
//添加异常错误信息到cat中 ,发送httpClient的post请求
} catch (IOException e) {
if (!(url.indexOf(MqConstanst.CONSUMERPRE + "/heartbeat") != -1
|| url.indexOf(MqConstanst.CONSUMERPRE + "/getMetaGroup") != -1)) {
logger.error("访问" + url + "异常,access_error", e);
}
addErrorCat(e, request, count, tryCount);
last = e;
} catch (BrokerException e) {
last = e;
addErrorCat(e, request, count, tryCount);
} catch (Exception e) {
last = e;
addErrorCat(e, request, count, tryCount);
} finally {
//如果响应不为空,则判断是否是import导入,如果是则放入失败的请求url
if (response != null) {
if (isImportant) {
failUrlG1.put(host, System.currentTimeMillis() - 10 * 1000);
} else {
failUrlG2.put(host, System.currentTimeMillis() - 10 * 1000);
}
//如果响应是publishMessageResponse实例,则进行降速,计数自减
if (response instanceof PublishMessageResponse) {
PublishMessageResponse response2 = ((PublishMessageResponse) response);
if (response2.getSleepTime() > 0) {
response = null;
logger.info(response2.getMsg());
Util.sleep(response2.getSleepTime());
// 这个不算重试,只是降速
count--;
}
} else {
BaseResponse baseResponse = (BaseResponse) response;
if (!baseResponse.isSuc() && baseResponse.getCode() == MqConstanst.NO) {
response = null;
Util.sleep(1000);
} else {
if (!baseResponse.isSuc()) {
logger.error(baseResponse.getMsg());
}
}
}
} else {
// response 等于null 说明接口调用失败了。此时需要将url 放入失败接口中。
if (isImportant) {
failUrlG1.put(host, System.currentTimeMillis());
} else {
failUrlG2.put(host, System.currentTimeMillis());
}
Util.sleep(500);
}
}
//计数++
count++;
}
if (last != null) {
throw new RuntimeException(last);
}
return response;
}
调用post:
public String post(String url, Object reqObj) throws IOException,BrokerException {
String json = "";
if (reqObj != null) {
json = JsonUtil.toJsonNull(reqObj);
}
Response response = null;
//事务
Transaction transaction = Tracer.newTransaction("mq-http", url);
try {
RequestBody body = RequestBody.create(JSONTYPE, json);
Request.Builder requestbuilder = new Request.Builder().url(url).post(body);
//cat上下文
CatContext catContext=Tracer.logRemoteCallClient();
if(catContext!=null){
requestbuilder.addHeader(CatContext.CHILD, catContext.getProperty(CatContext.CHILD));
requestbuilder.addHeader(CatContext.PARENT, catContext.getProperty(CatContext.PARENT));
requestbuilder.addHeader(CatContext.ROOT, catContext.getProperty(CatContext.ROOT));
}
//进行请求
Request request=requestbuilder.build();
response = client.newCall(request).execute();
//设置事务成功状态
transaction.setStatus(Transaction.SUCCESS);
if (response.isSuccessful()) {
return response.body().string();
} else {
BrokerException exception = new BrokerException(
response.code() + " error,and message is " + response.message()+",json is "+json);
throw exception;
}
} catch (IOException e) {
transaction.setStatus(e);
throw e;
}catch (Exception e) {
transaction.setStatus(e);
throw e;
}
finally {
transaction.complete();
try {
if (response != null) {
response.close();
}
} catch (Exception e) {
}
}
}
也即从这里可以看出其请求最终会调用okHttp发送post请求,完成请求和响应。可以看到对于错误和异常的处理做得是非常细致的。下一篇来看pmq是怎么整合spring、启动服务端和客户端的。