很多规模稍大点的公司,内部都会有多个业务部门,这些业务部门都有自己的业务产品。每个业务部门开发的产品部署的环境物理上也都是相对隔离的,但这些业务部门之间可能存在合作关系,业务关联,因此就有了跨业务RPC调用的需求。
Dubbo的分层架构,提供的各层扩展点,让Dubbo具备了优秀的扩展性。我们基于Dubbo二次开发,借助Registry扩展点、RouterFactory扩展点实现了跨业务RPC调用,不需要修改Dubbo的源码。
dubbo消费者refer的过程中,会创建一个RegistryDirectory实例,用于缓存服务提供者以提升性能,不必每次都去注册中心查找。
一个消费者可以从多个提供者中选择一个调用,因此消费者端的Invoker经过cluster层包装了路由、负载均衡的逻辑。
实现跨业务RPC调用,只需要让RegistryDirectory实例能够获取其它业务环境的提供者,再通过路由器选择目标业务环境的所有提供者。
让RegistryDirectory实例获取其它业务环境的提供者,简单的实现,就是同时订阅其它业务环境的注册中心。也就是修改RegistryDirectory的subscribe方法,订阅多个注册中心。不过Dubbo提供有扩展点,不需要修改源码,即Registry扩展点。
如何利用扩展点,我们从RegistryProtocol#doRefer方法寻找突破口。
public class RegistryProtocol implements Protocol {
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// .....
// 订阅提供者、配置、路由
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
return invoker;
}
}
doRefer首先为消费者创建RegistryDirectory,然后调用RegistryDirectory实例的subscribe方法,指定只订阅服务提供者、动态配置、路由(消费者不需要订阅消费者)。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public void subscribe(URL url) {
setConsumerUrl(url);
//
registry.subscribe(url, this);
}
}
RegistryDirectory实例则调用Registry实例的subscribe方法实现订阅,而这个Registry实例就是我们自定义的MyRegistry。
public class MyRegistry extends AbstractRegistry{
private final ConcurrentMap<String, Registry> registries = new ConcurrentHashMap<>();
@Override
public void subscribe(URL url, NotifyListener listener) {
for (String business : otherBusinesss) {
Registry registry = registries.get(business);
//
registry.subscribe(businessUrl, new CrossBusinessListener(listener,business));
}
}
}
MyRegistry本身不实现订阅注册中心的逻辑,而是借用设计模式管理多个业务的Registry的订阅。
CrossBusinessListener负责实现聚合所有业务的Registry监听到的服务提供者,并负责注册路由器,在后续发起RPC调用时,由路由器从聚合的提供者中,选择最优的提供者。
public class CrossRouterNotifyListener extends AbstractNotifyListenerDelegate {
private NotifyListener listener;
private String business;
public CrossRouterNotifyListener(NotifyListener listener, String business){
this.listener = listener;
this.business = business;
}
@Override
public void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) {
return;
}
// 为每个url添加business=xxx参数,为了实现后续的路由
urls = addBusinessEnvParam(urls);
// 聚合所有环境注册的提供者到CrossServiceProviderManager
URL firstUrl = urls.get(0);
String businessEnv = firstUrl.getParameter(firstUrl.getParameter(LzDubboConstant.BUSINESS_ENV);
CrossServiceProviderManager.putBusinessProvider(businessEnv, urls);
// 从CrossServiceProviderManager取可用的提供者
List<URL> newUrls = new ArrayList<>();
// 如果本业务环境有则优先使用,否则取其它业务环境的
newUrls.addAll(CrossServiceProviderManager.smartChoiceBusinessProvider(businessEnv));
// 前面注册的路由不能丢
for (URL url : urls) {
if (url.getProtocol().equals(Constants.ROUTE_PROTOCOL)) {
newUrls.add(url);
}
}
// 添加CrossBusinessRouter路由
newUrls.add(crossBusinessRouter2Url());
super.notify(newUrls); // 通知RegistryDirectory更新本地缓存
}
}
CrossRouterNotifyListener代理了RegistryDirectory(listener)。多个业务环境的CrossRouterNotifyListener实例代理的是同一个RegistryDirectory实例,所以,当其中一个CrossRouterNotifyListener实例调用RegistryDirectory实例的notify方法刷新缓存时,会覆盖其它CrossRouterNotifyListener实例刷新的结果。因此,RegistryDirectory实例能够感知到其中任何一个业务的提供者变化。
在dubbo中,url是各层、甚至是各功能的衔接剂。添加路由器通过添加路由url实现,由RegistryDirectory实例将路由url转为路由实例。
// CrossRouterNotifyListener#crossBusinessRouter2Url
private URL crossBusinessRouter2Url() {
String sb = Constants.ROUTE_PROTOCOL + "://" + Constants.ANYHOST_KEY + "/" +
subscribeUrl.getServiceInterface() + "?" +
Constants.CATEGORY_KEY + "=" + Constants.ROUTERS_CATEGORY +
"&" + Constants.ROUTER_KEY + "=" + "crossBusinessRouter" + // 最重要的是这个参数
;
return URL.valueOf(sb);
}
路由器类型还要在SPI中注册:
# /resources/META-INF/dubbo/cm.alibaba.dubbo.rpc.cluster.RouterFactory
crossBusinessRouter=fm.lizhi.dubbo.cluster.router.CrossBusinessRouterFactory
在发起rpc请求时,RegistryDirectory实例先从本地缓存取得所有提供者,然后调用路由器的route方法,获取路由后的服务提供者。
public class CrossBusinessRouter{
private String myBusinessEnv = System.getProperty("metadata.business");
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
if (invocation.getInvoker() == null) {
// 不是真正的调用
return invokers;
}
// 将invokers按business转为Map
Map<String, List<Invoker<T>>> categoryInvokerMap = categorize(invokers);
// 优先选择本业务的提供者,其中myBusinessEnv是本进程部署的业务环境
List<Invoker<T>> candidates = categoryInvokerMap.get(myBusinessEnv);
if (candidates != null && !candidates.isEmpty()) {
return candidates;
}
// 选择其它业务环境的提供者
for (String business : allBusiness) {
candidates = categoryInvokerMap.get(business);
if (candidates != null && !candidates.isEmpty()) {
return candidates;
}
}
return Collections.EMPTY_LIST;
}
}
CrossBusinessRouter实现优先选择本业务的提供者,没有则走跨业务调用,即选择其它业务有的提供者。
路由之后就是负载均衡的逻辑了。
以上案例只是一个简单的实现,前提条件是各业务环境内网网络互通,而我们实现的跨环境RPC调用还比较复杂,需要同时支持跨大区机房、跨业务环境,组合起来就有三种可能:同业务跨区域、同区域跨业务、跨业务跨区域,并且我们约定业务之间不能直接RPC调用,需要由一层代理转发,防止腐化。
首先增加一个数据同步服务,负责同步zk数据到其它环境的zk,只同步接口,不同步注册上的提供者。
代理服务,在调用端伪装成服务提供者,在提供者端伪装成服务消费者。
假设A业务提供UserService接口,B业务需要调用A业务的UserService接口,代理服务为C,那么: