首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RocketMQ(二):RPC通讯

匠心零度 转载请注明原创出处,谢谢!

RocketMQ网络部署图

NameServer:在系统中肯定是做命名服务,更新和发现 broker服务。

Broker-Master:broker 消息主机服务器。

Broker-Slave: broker 消息从机服务器。

Producer: 消息生产者。

Consumer: 消息消费者。

rocketmq的几个核心的模块,而对于每个模块都是单独的jvm进程,我们看到上面的架构图的时候,那些箭头就是rocketmq的rpc调用,下面我们来看看rocketmq的rpc是如果进行封装实现的。

说明:rocketmq系列都将会以rocketmq-4.1.0-incubating进行介绍。

RocketMQ通信组件

先排除Master、Slave直接通过原生的nio进行调用,其他通讯都是基于netty-all-4.0.36.Final以及RocketMQ自定义协议进行通讯的。

网络协议定义如下

我们来看看header data里面的数据定义:

code对于Request来说就是RequestCode类里面的常量信息:

说明:公众号【匠心零度】回复:rocketmq,可获得基于rocketmq4.1.0加详细中文代码注释 。

code对于Response来说就是ResponseCode类里面的常量信息:

flag字段进行说明,其他后续分析到具体的具体分析。

flag = 0表示是request,flag = 1表示是response。

flag为2、3(二进制表示10、11)为oneway请求。

code=310很快我们就明白什么意思了:

对于下面类似a、b、c等可以简单查看下类SendMessageRequestHeaderV2(后续继续讲解)基本就是类似js压缩效果,可以借鉴学习下。

备注:RemotingCommand类包含了传输过程中所有数据的封装,还包括了编解码等操作(非常棒!!!解读为什么这样,从面向对象角度,谁拥有数据谁就对外提供操作这些数据的方法,这句话应该是大学的时候学习面向对象的时候看张孝祥老师说的,一直记忆犹新,的确应该这么设计,rocketmq就这么做的,再次学习)。

RocketMQ网络协议实现

UML类图

上面的图已经做到非常清晰了,RemotingClient接口定义了client应该具备那些功能,RemotingSever类似,主要有:registerProcessor、invokeSync(同步调用)、invokeAsync(异步调用)、invokeOneway(单向调用)等等,而RemotingClient与RemotingSever在三种调用的区别就是参数有所区别。

NettyRemotingAbstract是Server与Client公用处理的抽象。

BrokerOuterAPI、MQClientImpl:都封装了NettyRemotingClient(后续介绍)。

不管是client还是server通过RemotingService我们明白,启动都是在start里面,我们看看里面核心netty代码,以server里面代码为例:

备注:此处netty相关内容不进行深入展开,只会把涉及的的简单说明,后续另开系列进行说明。

涉及主要ChannelHandler简单说明

在进行tcp传输的时候经常会面临黏包/拆包问题,netty自带了很多通用的TCP黏包/拆包解决方案,下面我们看看rocketmq如何借助netty来实现编解码:NettyEncoder编码、NettyDecoder解码,rocketmq相关的网络协议上面内容已经说明过了。

NettyEncoder编码

NettyDecoder解码

netty中针对这四种场景均有对应的解码器作为解决方案,比如:

通过FixedLengthFrameDecoder 定长解码器来解决定长消息的黏包拆包问题。

通过LineBasedFrameDecoder和StringDecoder来解决以回车换行符作为消息结束符的TCP黏包拆包的问题。

通过DelimiterBasedFrameDecoder 特殊分隔符解码器来解决以特殊符号作为消息结束符的TCP黏包拆包问题。

通过LengthFieldBasedFrameDecoder自定义长度解码器解决TCP黏包拆包问题。

rocketmq中使用的就是基于LengthFieldBasedFrameDecoder自定义长度解码器的。

IdleStateHandler:Netty自带的心跳检测。

NettyConnetManageHandle:主要就是链接管理,新连接、连接断开、异常、Idle等事件,每个事件过来存入NettyEventExecuter的队列里面。

NettyEventExecutor的run方法会不断的从队列里面取事件进行相应的处理:

NettyServerHandler:具体业务处理(后续会说到)。

核心NettyRemotingAbstract介绍

invokeSync(同步调用)进行说明:

opaque就相当与标识的这个请求,虽然rpc调用请求发送结束了,但是响应回来的时候还是会带有该信息就可以判断出是原来那个请求,比如响应回来之后执行原来给定的回调等。

通过countDownLatch来控制等待网络通信时间 :

invokeAsync(异步调用)进行说明:

与invokeSync(同步调用)基本类似,boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);//控制异步请求的个数以及超时和使用使用布尔原子变量,信号量保证只释放一次,对于异步invokeCallback不为空,需要进行调用。invokeOneway(单向调用)比较简单略过。

下面看看消息接收处理:

备注:这里判断是request还是response都是通过header里面的flag标记来判断的,上面已经说明。

processResponseCommand在介绍上面三种发送的时候说过了,下面重点看看processRequestCommand

备注:这里需要做流控,要求线程池对应的队列必须是有大小限制的,是通过线程池进行限流的。

友情推荐

iocoder.png

参考:

RocketMQ原理介绍V3.1.1

netty源码分析之LengthFieldBasedFrameDecoder:https://www.jianshu.com/p/a0a51fd79f62

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180404G1HRMH00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券