前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于扩展点,为dubbo支持跨业务调用

基于扩展点,为dubbo支持跨业务调用

作者头像
吴就业
发布2022-03-29 09:12:47
3120
发布2022-03-29 09:12:47
举报
文章被收录于专栏:Java艺术Java艺术

很多规模稍大点的公司,内部都会有多个业务部门,这些业务部门都有自己的业务产品。每个业务部门开发的产品部署的环境物理上也都是相对隔离的,但这些业务部门之间可能存在合作关系,业务关联,因此就有了跨业务RPC调用的需求。

Dubbo的分层架构,提供的各层扩展点,让Dubbo具备了优秀的扩展性。我们基于Dubbo二次开发,借助Registry扩展点、RouterFactory扩展点实现了跨业务RPC调用,不需要修改Dubbo的源码。

简要概括原理

dubbo消费者refer的过程中,会创建一个RegistryDirectory实例,用于缓存服务提供者以提升性能,不必每次都去注册中心查找。

一个消费者可以从多个提供者中选择一个调用,因此消费者端的Invoker经过cluster层包装了路由、负载均衡的逻辑。

实现跨业务RPC调用,只需要让RegistryDirectory实例能够获取其它业务环境的提供者,再通过路由器选择目标业务环境的所有提供者。

让RegistryDirectory实例获取其它业务环境的提供者,简单的实现,就是同时订阅其它业务环境的注册中心。也就是修改RegistryDirectory的subscribe方法,订阅多个注册中心。不过Dubbo提供有扩展点,不需要修改源码,即Registry扩展点。

实现过程

如何利用扩展点,我们从RegistryProtocol#doRefer方法寻找突破口。

代码语言:javascript
复制
public class RegistryProtocol implements Protocol {
    
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // .....
        // 订阅提供者、配置、路由
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        Invoker invoker = cluster.join(directory);
        
        return invoker;
    }
}

doRefer首先为消费者创建RegistryDirectory,然后调用RegistryDirectory实例的subscribe方法,指定只订阅服务提供者、动态配置、路由(消费者不需要订阅消费者)。

代码语言:javascript
复制
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    
    public void subscribe(URL url) {
        setConsumerUrl(url);
        // 
        registry.subscribe(url, this);
    }
}

RegistryDirectory实例则调用Registry实例的subscribe方法实现订阅,而这个Registry实例就是我们自定义的MyRegistry。

代码语言:javascript
复制
public class MyRegistry extends AbstractRegistry{

    private final ConcurrentMap<String, Registry> registries = new ConcurrentHashMap<>();
  
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        for (String business : otherBusinesss) {
            Registry registry = registries.get(business);
            // 
            registry.subscribe(businessUrl, new CrossBusinessListener(listener,business));
        }
    }
}

MyRegistry本身不实现订阅注册中心的逻辑,而是借用设计模式管理多个业务的Registry的订阅。

CrossBusinessListener负责实现聚合所有业务的Registry监听到的服务提供者,并负责注册路由器,在后续发起RPC调用时,由路由器从聚合的提供者中,选择最优的提供者。

代码语言:javascript
复制
public class CrossRouterNotifyListener extends AbstractNotifyListenerDelegate {
  
   private NotifyListener listener;
   private String business;
  
   public CrossRouterNotifyListener(NotifyListener listener, String business){
     this.listener = listener;
     this.business = business;
   }
  
    @Override
    public void notify(List<URL> urls) {
        if (urls == null || urls.isEmpty()) {
            return;
        }
      
        // 为每个url添加business=xxx参数,为了实现后续的路由
        urls = addBusinessEnvParam(urls);

        // 聚合所有环境注册的提供者到CrossServiceProviderManager
        URL firstUrl = urls.get(0);
        String businessEnv = firstUrl.getParameter(firstUrl.getParameter(LzDubboConstant.BUSINESS_ENV);
        CrossServiceProviderManager.putBusinessProvider(businessEnv, urls);

        // 从CrossServiceProviderManager取可用的提供者
        List<URL> newUrls = new ArrayList<>();
                                                   
        // 如果本业务环境有则优先使用,否则取其它业务环境的
        newUrls.addAll(CrossServiceProviderManager.smartChoiceBusinessProvider(businessEnv));
       
        // 前面注册的路由不能丢
        for (URL url : urls) {
            if (url.getProtocol().equals(Constants.ROUTE_PROTOCOL)) {
                newUrls.add(url);
            }
        }
        // 添加CrossBusinessRouter路由
        newUrls.add(crossBusinessRouter2Url());

        super.notify(newUrls); // 通知RegistryDirectory更新本地缓存
    }

}

CrossRouterNotifyListener代理了RegistryDirectory(listener)。多个业务环境的CrossRouterNotifyListener实例代理的是同一个RegistryDirectory实例,所以,当其中一个CrossRouterNotifyListener实例调用RegistryDirectory实例的notify方法刷新缓存时,会覆盖其它CrossRouterNotifyListener实例刷新的结果。因此,RegistryDirectory实例能够感知到其中任何一个业务的提供者变化。

在dubbo中,url是各层、甚至是各功能的衔接剂。添加路由器通过添加路由url实现,由RegistryDirectory实例将路由url转为路由实例。

代码语言:javascript
复制
 // CrossRouterNotifyListener#crossBusinessRouter2Url
  private URL crossBusinessRouter2Url() {
        String sb = Constants.ROUTE_PROTOCOL + "://" + Constants.ANYHOST_KEY + "/" +
                subscribeUrl.getServiceInterface() + "?" +
                Constants.CATEGORY_KEY + "=" + Constants.ROUTERS_CATEGORY +
                "&" + Constants.ROUTER_KEY + "=" + "crossBusinessRouter" + // 最重要的是这个参数
                ;
        return URL.valueOf(sb);
  }

路由器类型还要在SPI中注册:

代码语言:javascript
复制
# /resources/META-INF/dubbo/cm.alibaba.dubbo.rpc.cluster.RouterFactory
crossBusinessRouter=fm.lizhi.dubbo.cluster.router.CrossBusinessRouterFactory

在发起rpc请求时,RegistryDirectory实例先从本地缓存取得所有提供者,然后调用路由器的route方法,获取路由后的服务提供者。

代码语言:javascript
复制
public class CrossBusinessRouter{
    
    private String myBusinessEnv = System.getProperty("metadata.business");
  
    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        if (invocation.getInvoker() == null) {
            // 不是真正的调用
            return invokers;
        }

        // 将invokers按business转为Map
        Map<String, List<Invoker<T>>> categoryInvokerMap = categorize(invokers);

        // 优先选择本业务的提供者,其中myBusinessEnv是本进程部署的业务环境
        List<Invoker<T>> candidates = categoryInvokerMap.get(myBusinessEnv);
        if (candidates != null && !candidates.isEmpty()) {
            return candidates;
        }
        
        // 选择其它业务环境的提供者
        for (String business : allBusiness) {
            candidates = categoryInvokerMap.get(business);
            if (candidates != null && !candidates.isEmpty()) {
                return candidates;
            }
        }

        return Collections.EMPTY_LIST;
    }
  
}

CrossBusinessRouter实现优先选择本业务的提供者,没有则走跨业务调用,即选择其它业务有的提供者。

路由之后就是负载均衡的逻辑了。

扩展实现

以上案例只是一个简单的实现,前提条件是各业务环境内网网络互通,而我们实现的跨环境RPC调用还比较复杂,需要同时支持跨大区机房、跨业务环境,组合起来就有三种可能:同业务跨区域、同区域跨业务、跨业务跨区域,并且我们约定业务之间不能直接RPC调用,需要由一层代理转发,防止腐化。

首先增加一个数据同步服务,负责同步zk数据到其它环境的zk,只同步接口,不同步注册上的提供者。

代理服务,在调用端伪装成服务提供者,在提供者端伪装成服务消费者。

假设A业务提供UserService接口,B业务需要调用A业务的UserService接口,代理服务为C,那么:

  • C注册到B的zk上,伪装为UserService的提供者。
  • C注册到A的zk上,伪装为UserService的消费者。
  • B调用A的UserService接口,经过代理服务C,转发给A。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-02-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java艺术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简要概括原理
  • 实现过程
  • 扩展实现
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档