在客户端已经从注册中心拉取和订阅服务列表完毕的前提下,Dubbo 完成一次完整的 RPC 调用,流程如下:
将上面的步骤进行细化,在一次 RPC 调用过程中,Cluster 层的流程如下:
其中步骤 1, 2, 3 是模板方法,使用通用的校验、参数准备等准备工作。最终,不同的容错机制的子类实现不同的 doInvoke
方法,每个子类方法都有各自的路由、负载均衡实现策略。
本章节主要总结 RPC 在 Cluster 层的工作,涉及步骤 1, 2, 3, 4,其中容错机制见5.1,容错过程中获取 Invoker 列表需要用到 Directory,见5.2;Directory 过程中需要用到路由,见5.3;负载均衡见5.4。剩余步骤 5, 6, 7 是具体的 RPC 调用,见第六章。
容错过程是在各容错机制实现子类的 doInvoke
方法重写实现的。容错过程对上层用户是完全透明的,上层用户不用关心容错过程是怎么实现的,同时用户也可以通过不同的配置项来选择不同的容错机制。支持的容错机制如下:
注: 大部分容错机制的核心步骤都是:
在上述步骤 3 容错机制的调用中,主要步骤都是:
如果有不同,在各自条目中进行说明
RpcException
,抛出并返回,不做任何重试;invocation
缓存到 ConcurrentHashMap
中,并返回空结果集;同时设置定时线程池,定时时间到了就将失败的任务投入线程池,重新请求;BlockingQueue
实现的;将多个调用任务投入线程池后,任务执行结果投入 BlockingQueue
;BlockingQueue
抛出异常;此时记录异常次数,只有到记录异常次数等于服务数量时,说明所有服务都抛出异常,此时再将异常信息投入 BlockingQueue
BlockingQueue # poll(int)
方法拉取结果,拉取到第一个结果就返回。如果返回值正常,就是其中一个服务的返回结果;如果返回值为 Exception
类型,说明所有服务都出现异常;容错过程中需要获取 Invoker 列表,用于后续的路由和负载均衡。这个过程需要用到 Directory # list
方法执行。Directory 接口有一个抽象类 AbstractDirectory,以及两个主要实现类:动态列表 RegistryDirectory,以及静态列表 StaticDirectory。主要总结的是动态列表 RegistryDirectory
,以及封装了基础方法的抽象类 AbstractDirectory
。
RegistryDirectory
主要实现了两个功能:
doList
方法;注册中心订阅的部分主要在 ZookeeperRegistry # doSubscribe()
方法中实现,见第二章注册中心部分。
在监听到注册中心对应 URL 变化后,触发 RegistryDirectory
对各种本地配置的动态更新。更新的配置包括:
RouterFactory
将 URL 包装成路由规则(见5.3),更新本地路由信息; doList
方法主要作用,就是调用路由方法。
注:路由的整体思路与笔者设计的动态汇总统计业务不谋而合,通过表达式的方式实现数据的处理。
路由会根据用户配置的不同路由策略,对 Invoker 列表进行过滤。主要分为条件路由、文本路由、脚本路由。路由工厂 RouterFactory
是一个 SPI 接口,用户可以自行通过实现 Router
接口扩展 Router 类;在调用的时候,在 URL 的 protocol
参数中可以设置 file / script / condition,分别寻找对应的实现类。
条件路由使用的是 condition://协议
,URL 形式是:“condition://0.0.0.0/com.foo.DemoService?category=routers&dynamic=false&rule=” + URL.encode(“host = 10.20.153.10 => host = 10.20.153.11”)
;每个参数都是有含义的:
参数名 | 含义 |
---|---|
condition:// | 路由类型为条件路由(可扩展) |
0.0.0.0 | 对全部 IP 生效,填入具体 IP,则只对该 IP 生效 |
com.foo.DemoService | 对指定服务生效,必填 |
category=routers | 当前设置指该数据为动态配置类型,必填 |
dynamic=false | 当前设置表示该数据为持久数据,必填 |
enable=true | 覆盖规则生效,默认生效 |
force=false | 路由结果为空时,是否强制执行,默认为 false,路由为空时将自动失效 |
rule=… | 路由规则内容,必填 |
条件路由最关键的部分在于 rule 的路由规则。以下面的路由规则为例:
method = find* => host = 192.168.1.22
find
开头的方法,都会被路由到 192.168.1.22 的服务节点上;=>
之前部分是服务消费者匹配条件; =>
之后部分是服务提供者列表的过滤条件; $protocol
等占位符方式,也支持 =, !=
等条件,也支持通配符 *
。条件路由的具体实现类是 ConditionRouter
,整体的思想是通过正则表达式,按照 =>
进行分割,然后对符号前后的内容进行正则表达式的匹配,匹配结果存入对象 MatchPair
中。对于上述的占位符、通配符等,MatchPair
会进行匹配解析。
注:条件路由的整体思路,类似于笔者设计的动态汇总统计业务。
文件路由通常和脚本路由搭配使用。文件路由将规则写到文件中,文件中写的是自定义的脚本规则,脚本可以是 Javascript, Groovy 等,文件路由 FileRouter
找到对应文件,将文件中的脚本内容按照类型匹配脚本路由,执行解析。
脚本路由使用 JDK 自带的脚本解析器,对脚本解析并运行,默认使用 Javascript 解析器。在构造脚本路由时初始化脚本执行引擎,根据脚本不同的类型,通过 JDK 提供的 ScriptEngineManager
创建不同的脚本执行器。接收到脚本内容后,执行 route 方法。具体的过滤逻辑需要用户自行定义。
注:在笔者设计的动态汇总统计业务中,笔者使用了 Aviator 表达式引擎,它与脚本路由中的脚本执行器
ScriptEngineManager
类似。
很多容错策略在路由选择出所有可用 Invoker 列表中实行最后一步筛选,负载均衡。
负载均衡的核心是 LoadBalance
接口及其子类具体实现的,但并不是直接使用 LoadBalance
方法。在容错策略中的负载均衡先使用了抽象父类 AbstractClusterInvoker
中定义的 Invoker select
方法,它在 LoadBalance
基础上又封装了一些特性:
select
方法也使用了模板模式,在 select
方法中处理通用逻辑,最后提供 doSelect
抽象方法供各子类具体实现。Dubbo 内置了四种负载均衡算法,此外由于 LoadBalance
接口带有 @SPI 注解,所以用户也可以自行扩展负载均衡算法。在调用方法时我们可以在 URL 中通过 loadbalance=xxx
动态指定 select 方法的负载均衡算法。
根据权重,设置随机概率做负载均衡。
LeastActive 就是最少活跃调用负载均衡,Dubbo 在运行过程中会统计每一次 Invoker 的调用,每次从活跃数最少的 Invoker 中选一个节点。
一致性 Hash 的原理见《数据结构与算法》篇第五章。
Dubbo 的一致性 Hash 负载均衡,将接口名 + 方法名作为 Key 值,类型为 ConsistentHashSelector
实例对象作为 Value 存入一个 ConcurrentHashMap 中。每次请求进入,解析请求获取到方法,将该方法转为 Key 值,找到对应的 ConsistentHashSelector
进行负载均衡。所以 ConsistentHashSelector
是 Dubbo 中一致性 Hash 实现的核心。
ConsistentHashSelector
的环形散列是用 TreeMap 实现的,所有真实节点、虚拟节点都放在 TreeMap 中。将节点的 IP + 递增数字,然后作 MD5 计算,最后进行 Hash 计算,作为 TreeMap 的 Key 值。TreeMap 的 Value 值为对应的某个可以调用的节点。关键代码如下:
// 遍历所有节点
for (Invoker<T> invoker : invokers) {
// 得到每个节点的 IP
String address = invoker.getUrl().getAddress();
// replicaNumber 是生成的虚拟节点数量,默认 160 个
for (int i = 0; i < replicaNumber / 4; i++) {
// 对 IP + 递增数字作 MD5 计算,作为节点标识
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
// 对标识作 Hash 计算,作为 TreeMap 的 Key 值
long m = hash(digest, h);
// 当前 Invoker 为 Value
virtualInvokers.put(m, invoker);
}
}
}
每次请求进来后,进行上述的 Key 值运算,每次请求的参数都不同,但是由于 TreeMap 是有序的树形结构,所以可以调用 TreeMap#ceilingEntry
方法,找到最近一个大于或等于给定 Key 值的节点 Entry。这样的操作相当于一致性 Hash 算法的顺时针向前查找的效果。