专栏首页Java升级之路Dubbo——服务目录

Dubbo——服务目录

引言

前面几篇文章分析了Dubbo的核心工作原理,本篇将对之前涉及到但却未细讲的服务目录进行深入分析,在开始之前先结合前面的文章思考下什么是服务目录?它的作用是什么?

正文

概念及作用

清楚Dubbo的调用过程就知道Dubbo在客户端和服务端都会为服务生成一个Invoker执行体,这个Invoker包含了所有的配置信息,也相当于是一个代理对象,所以这也就引发出几个问题:

  • 怎么管理Invoker?不可能让用户自己去管理,也不可能客户端每次调用服务时都新创建Invoker。
  • 当服务存在集群时,选择使用哪一个Invoker?怎么选择?
  • 服务列表变化时,Invoker列表怎么更新?

针对以上问题,Dubbo引入了服务目录的概念,简单的说就是Invoker的集合,由框架自身统一管理Invoker列表,并且提供订阅服务功能,使得服务变更时,会自动更新Invoker列表;同时当存在集群时,可以使得外部以统一的方法使用Invoker,即用户不用关心怎么选择使用哪一个Invoker(在之前的源码分析中我们看到过cluster.join就是实现该功能的API,将多个Invoker合成一个Invoker)。

继承结构

了解了基本概念后,我们来看看服务目录的继承体系:

Directory继承Node接口,该接口是Dubbo中节点的高度抽象,它提供了获取url配置、判断节点是否可用以及销毁节点的接口,由各个子类实现,只要是和服务节点相关的实现都可以实现该接口。比如Invoker、Directory、Registry等。目录的内置实现有StaticDirectory和RegistryDirectory两个,第一个是静态目录服务,其中的Inovker列表是不会改变的;而RegistryDirectory实现了NotifyListener接口,表示会监听注册中心节点的变化,当节点信息改变时,RegistryDirectory中的Inovker列表会自动更新。

源码分析

AbstractDirectory

从上面的继承图我们可以看到StaticDirectory和RegistryDirectory都继承了AbstractDirectory,可以猜到多半又是模板方法模式的实现,确实也是这样,AbstractDirectory提供了获取Invoker的接口,而具体的实现则是子类自行实现的,我们来看看其源码:

public abstract class AbstractDirectory<T> implements Directory<T> {

    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed){
            throw new RpcException("Directory already destroyed .url: "+ getUrl());
        }
        // 获取invoker列表,由子类实现
        List<Invoker<T>> invokers = doList(invocation);
        // 本地路由列表
        List<Router> localRouters = this.routers; 
        if (localRouters != null && localRouters.size() > 0) {
            for (Router router: localRouters){
                try {
                	// 是否需要进行路由
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }
    
    protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException ;

	// 省略其它方法

}

这个方法逻辑很简单,没什么好说的,其中路由配置可自行了解,本文不打算展开,下面主要看看子类是如何实现doList方法以及如何管理Inovker的。

RegistryDirectory

刚说了RegistryDirectory是动态的目录服务,会监听注册中心的节点变化,并自动刷新Invoker列表,用户可以通过它拿到实时的Invoker列表,下面就主要分析这三部分是如何实现的。

注册监听及自动更新Invoker列表

public void subscribe(URL url) {
    setConsumerUrl(url);
    registry.subscribe(url, this);
}

注册监听很简单,客户端创建Zookeeper连接时,会添加监听器监听节点变化,该监听器最终会调用到RegistryDirectory的subscribe方法,使得目录也可以监听节点变化,当节点发生变化时,又会触发NotifyListener的notify方法,RegistryDirectory就实现了该方法:

public synchronized void notify(List<URL> urls) {
	// 存放provider节点下的url
    List<URL> invokerUrls = new ArrayList<URL>();
    // 存放router节点下的url
    List<URL> routerUrls = new ArrayList<URL>();
    // 存放configurator节点下的url
    List<URL> configuratorUrls = new ArrayList<URL>();
    // 每次传入的url都是节点下所有的url,并非增量
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        // 根据url的协议和分类存放到对应的List中
        if (Constants.ROUTERS_CATEGORY.equals(category) 
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
        	// 忽略不支持的url
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    // 缓存configurator url 
    if (configuratorUrls != null && configuratorUrls.size() >0 ){
        this.configurators = toConfigurators(configuratorUrls);
    }
    // 设置路由
    if (routerUrls != null && routerUrls.size() >0 ){
        List<Router> routers = toRouters(routerUrls);
        if(routers != null){ // null - do nothing
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    // 合并override参数
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && localConfigurators.size() > 0) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    // 根据provider节点下的url刷新invoker
    refreshInvoker(invokerUrls);
}

这个方法主要缓存各种类型的url以及配置路由,Invoker的刷新主要是在refreshInvoker方法中:

private void refreshInvoker(List<URL> invokerUrls){
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        // invokerUrls中只有一个url且协议为empty,则禁用所有服务
        this.forbidden = true; // 禁止访问
        this.methodInvokerMap = null; // 置空列表
        destroyAllInvokers(); // 关闭所有Invoker
    } else {
        this.forbidden = false; // 允许访问
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
        	// invokerUrls为空但缓存中有url,则将缓存中的url添加到invokerUrls
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
        	// 将invokerUrls中的所有url缓存起来,便于交叉对比
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls);
        }
        if (invokerUrls.size() ==0 ){
        	return;
        }
        // 将URL列表转成Invoker列表,key是url
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;
        // 换方法名映射Invoker列表
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); 
        // 转换出错,抛出异常
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
            return ;
        }
        // 合并多个invoker
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try{
        	// 销毁无用的invoker
            destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); 
        }catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

该方法首先会判断是否需要禁用服务,若不需要,则将url转化为Invoker,并将方法名映射到对应的Invoker,紧接着将多组Invoker合并后赋值给this.methodInvokerMap变量(该变量会在doList遍历Invoker时用到),最后会销毁掉缓存中已经无用的Invoker,避免调用到已怠机的服务。以上就是Invoker自动刷新的流程,其中各个依赖方法的细节感兴趣的可自行分析,下面就一起来看看如何获取Invoker列表。

获取Invoker列表

public List<Invoker<T>> doList(Invocation invocation) {
	// 服务提供者关闭或禁止了服务抛出异常
    if (forbidden) {
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "Forbid consumer " +  NetUtils.getLocalHost() + " access service " + getInterface().getName() + " from registry " + getUrl().getAddress() + " use dubbo version " + Version.getVersion() + ", Please check registry access list (whitelist/blacklist).");
    }
    List<Invoker<T>> invokers = null;
    // 本地Invoker列表,在refreshInvoker中合并赋值
    Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; 
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
    	// 获取方法名和参数列表
        String methodName = RpcUtils.getMethodName(invocation);
        Object[] args = RpcUtils.getArguments(invocation);
        // 第一个参数为String或者Enum,不太清楚有什么意义
        if(args != null && args.length > 0 && args[0] != null
                && (args[0] instanceof String || args[0].getClass().isEnum())) {
            invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由
        }
        // 根据方法名获取Inovker列表
        if(invokers == null) {
            invokers = localMethodInvokerMap.get(methodName);
        }
        // 根据“*”获取Invoker列表
        if(invokers == null) {
            invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
        }
        // 这里没有什么用处,新版中已经移除
        if(invokers == null) {
            Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
            if (iterator.hasNext()) {
                invokers = iterator.next();
            }
        }
    }
    return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}

这个逻辑也非常清晰,就是根据方法名或“*”拿到对应的Invoker列表。以上就是动态服务目录的实现原理,下面再来看看静态服务目录。

StaticDirectory

public class StaticDirectory<T> extends AbstractDirectory<T> {
    
    private final List<Invoker<T>> invokers;

    public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
        super(url == null && invokers != null && invokers.size() > 0 ? invokers.get(0).getUrl() : url, routers);
        if (invokers == null || invokers.size() == 0)
            throw new IllegalArgumentException("invokers == null");
        this.invokers = invokers;
    }
    
    public void destroy() {
        if(isDestroyed()) {
            return;
        }
        super.destroy();
        for (Invoker<T> invoker : invokers) {
            invoker.destroy();
        }
        invokers.clear();
    }
    
    @Override
    protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {

        return invokers;
    }

}

相比较而言,静态服务目录就简单多了,通过构造器创建,由于没有提供更新的方法,所以一旦创建就不会改变,而读取Inovker列表只需要将自身变量返回即可。

总结

服务目录的原理我们搞清楚了,再结合前面几篇文章,我们能够掌握Dubbo的核心实现原理。现在我们再来看看Dubbo的架构图:

抛开种种繁杂的功能,你会发现这个架构和RMI以及我们之前手写实现的RPC架构没太大区别,所以,大道至简,掌握基础才能更加快速地理解更复杂的框架应用。 至此,Dubbo系列暂时就写到这了,但其本身做为一个优秀的开源框架,发展这么多年,不可能这几篇文章就涵盖完全了,其它的诸如序列化、路由、Monitor以及文中未做详细分析的部分,读者们可自行阅读源码分析。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Dubbo——服务发布原理

    在使用Dubbo的时候你一定会好奇它是怎么实现RPC的,而要了解它的调用过程,必然需要先了解其服务发布/订阅的过程,本篇将详细讨论Dubbo的发布过程。

    夜勿语
  • Dubbo——服务引用

    上一篇我们分析了服务发布的原理,可以看到默认是创建了一个Netty server,并通过Invoker调用服务,同样,在客户端也会创建一个Inovker对象,下...

    夜勿语
  • 你所不知道的Spring的@Autowired实现细节

    相信很多Java开发都遇到过一个面试题:Resource和Autowired的区别是什么?这个问题的答案相信基本都清楚,但是这两者在Spring中是如何实现的呢...

    夜勿语
  • 每周学点大数据 | No.67 Hadoop 实践案例——记录去重

    No.67 Hadoop 实践案例——记录去重 Mr. 王:现在我们看一个和 WordCount 很相似,在实际中应用也很多的例子——记录去重。 小可 :嗯,...

    灯塔大数据
  • SSHFD:IBM提出的单阶段人体跌倒检测网络

    论文地址:http://xxx.itp.ac.cn/pdf/2004.00797v2

    AI算法修炼营
  • go示例:LDAP简介

    LDAP(轻量级目录访问协议,Lightweight Directory Access Protocol)是实现提供被称为目录服务的信息服务。目录服务是一种特殊...

    mojocn
  • Python+树莓派制作IoT(物联网)门控设备

    因为考虑需要在户外使用这套物联网门控设备,所以利用树莓派完成这个设备有两个问题需要解决, 第一是需要解决树莓派和相关模块的供电问题。 第二就是需要户外没有宽带网...

    緣來
  • ​引入一项新技术前,我们该想清楚什么?

    很多小伙伴在工作遇到一定瓶颈的时候,都希望引入一些新技术来解决问题,比如最近经常在群里看到大家聊:

    木东居士
  • 《selenium2 python 自动化测试实战》(19)——Selenium工具介绍

    用户2149234
  • Eclipse安装SVN插件简记

    此处使用的Eclipse版本为Version: Luna Service Release 2 (4.4.2)

    汐楓

扫码关注云+社区

领取腾讯云代金券