一、服务消费者
服务引用时流程会走到DubboProtocol#refer方法,之前篇章中没有提及Netty环节,本节补上
在getClients时如果有共享链接则直接使用,反之,通过信息交换层Exchangers进行connect得到客户端client实例
可以看到信息交换层Exchanger是个SPI扩展点,默认实现HeaderExchanger
谈谈为什么心跳探测要在客户端发起,而不是服务端去探测?如何应对网络抖动情况下的节点管理?动态注册中心可以把异常的节点及时去除然后通知到消费端,但是如果是因为网络抖动误判就比较麻烦了,可以设置比例但是消费端感知会有时延。那么换个思路解决这个问题,服务消费者并不严格以注册中心的服务节点为准,而是根据实际情况来判断提供者节点是否可用
接下来看下传输层Transporters.connect中的业务逻辑
Transporter是个SPI自适应扩展点,通过URL中的值获取相应的实现类,默认是NettyClient,系统支持GrizzlyClient、MinaClient、NettyClient、NettyClient4。所以connect方法最终会走到NettyClient#connect方法
channelHandler参数是包装了HeaderExchangeHandler的DecodeHandler,静态变量中的nioEventLoopGroup字段,它设置的线程数是Math.min(Runtime.getRuntime().availableProcessors() + 1, 32),”CPU核数加1″或”32″,大家平时研发线程数设置可以参考这个,另外线程数不是越多越好,业界推荐的计算方法是最佳线程数=CPU核数*[1+(I/O耗时/CPU耗时)]
另外在NettyClient的父类构造方法中进行了handler包装可以看到包装成了包括心跳处理器的MuiltMessageHandler,这里的请求调度器Dispatcher是通过自适应扩展器得到的,默认实现是AllDispatcher
在抽象类AbstractClient的构造方法中调用了模板方法doOpen和doConnect,那么我们先看下NettyClient的doOpen方法
主要业务逻辑是进行创建Netty客户端,客户端只需要创建一个 EventLoopGroup即可,然后将编解码、心跳、业务处理器注册到pipeline事件流中。另外connect方法无非就是调用ChannelFuture future = bootstrap.connect(getConnectAddress());得到future然后阻塞等待结果。另外配置ChannelOption选项时,通过TCP_NODELAY控制禁用了Nagle优化,纳格算法的工作方式是合并一定数量的输出数据后一次提交。特别的是,只要有已提交的数据包尚未确认,发送者会持续缓冲数据包,直到累积一定数量的数据才提交,不过该算法与 TCP延迟确认会有不好的相互作用
我们知道服务引用时会调用DubboInvoker#doInvoke方法
通过currentClient.request(inv,timeout)将RpcInvocation传入,继续跟下来会走到NettyChannel#send方法中
这里需要注意两点:
1、writeAndFlush写队列并刷新,实际上netty会把要发送的消息保存在ChannelOutboundBuffer里面,如果网络对方处理速度比较慢或者消息发送比较快或者消息发送量过大都有可能导致内存溢出;。建议优化方式:通过启动项channelOption设置发送队列的长度,或者通过-D启动参数配置长度
2、nettyChannel#send方法中没有直接进行回推发送失败的消息,因为Dubbo提供了容错补偿机制,如果在日常中使用Netty需要自己处理发送失败的消息,不可生吞异常
流程复习 RegistryProtocol#export=>DubboProtocol#export=>DubboProtocol#openServer=>DubboProtocol#createServer=>Exchangers#bind=>HeaderExchanger#bind=>Transporters#bind=>NettyServer#init=>AbstractServer#init=>NettyServer#doOpen
初始化EventLoopGroup-bossGroup,它只负责认证授权、连接然后将socket注册到IO连接池中的某个channel,线程数为1,不过Netty推荐使用的是线程池;初始化EventLoopGroup-workerGroup,负责IO读写;将EventLoopGroup注册到bootstrap并处理连接,可以看到使用的是一主多从线程模型,Netty根据group参数设置不多的reactor线程模型,默认支持单线程、多线程模型、主从多线程模型,配置非常灵活;最后配置channel,配置连接处理器,然后绑定,阻塞等待关闭
当接收到客户端的消息时,NettyServerHandler#channelRead
AllChannelHandler#received
提交ChannelEventRunnabley线程任务,根据state不同进行处理,接收信息、连接、断连、发送、异常
DecodeHandler#received
HeaderExchangeHandler#received=>this#handleRequest。流程复习:DubboProtocol#createServer=>Exchangers#bind=>HeaderExchanger#bind=>HeaderExchangeHandler#init,所以构造方法中的handler参数其实是DubboProtcol中的requestHandler
HeaderExchangerHandler#handleRequest
DubboProtocol.requestHandler#reply
Invoker.invoke调用链大概是
echoFilter=>genericFilter=>contextFilter=>traceFilter=>timeoutFilter=>exceptionFilter=>RegistryProrocol$invokerdelegate=>JavassistProxyFactory#getInvoker#doinvoke
通过wrapper包装类执行invokeMethod得到结果,然后通过netty的channel响应消息给服务引用者