抱歉,你查看的文章不存在

【手撕源码】Dubbo的工作机制&源码分析

Dubbo是基于Netty设计的

而Netty基于NIO

先简单对NIO和Netty作简单了解

1. NIO

NIO(Non-Blocking IO)起源于jdk1.4

NIO与BIO的作用是相同的,但使用方式完全不同

NIO支持面向缓冲区的、基于通道的IO操作,效率更高

NIO与BIO的区别:

  • BIO:面向流,阻塞
  • NIO:面向缓冲区,非阻塞,基于选择器
  • 使用NIO,在程序读取文件时,建立的不再是BIO中的“流”,而是通道
  • 数据传输的载体是缓冲区,这个缓冲区需要流动在通道里
    • 可以将通道理解为铁路,缓冲区理解为火车:铁路本身不具备传输能力,但它却是火车通行的必要条件

在实际高并发中,如果使用BIO,要为每一个请求都创建一个Socket,而每个Socket又需要对应的IO流,每个IO流又需要单独的线程,这样就会导致大量线程阻塞,系统运行缓慢。

如果换用NIO,因为所有的流都被换为通道Channel,而通道中使用缓冲区Buffer传输数据。所有的通道都交给一个选择器Selector(也被称为多路复用器),由它来监听这些通道的事件,来分别执行不同的策略。

2. Netty

Netty是一个异步事件驱动网络应用程序框架

它通常用于开发可维护高性能协议服务器和客户端

使用Netty,可以极大简化TCP和UDP的SocketServer等网络编程

Netty基于NIO,主要的底层逻辑如下:

  1. Netty服务器启动,并监听某一个端口
  2. 服务器启动时,会初始化Netty的通道(NioServerSocketChannel),并注册到Selector(暂且叫AcceptSelector)中,这个AcceptSelector只关心accept事件
  3. AcceptSelector会轮询Netty通道的accept事件
  4. 当监听到accept事件后,该AcceptSelector会处理accept事件,并与客户端建立起一个新的通道(NioSocketChannel)
  5. 之后,将这个新的通道注册到另一个Selector(暂且叫ReadWriteSelector)中,这个ReadWriteSelector只感兴趣read和write事件
  6. ReadWriteSelector轮询与客户端连接的通道,当监听到read就绪时,Netty会从通道中读数据(基于任务队列),write事件同理
  7. 当read和write完成预定目标后,整个流程结束

另外,Netty底层有一个Boss线程组,还有一个Worker线程组

  • Boss线程组负责监听主线程Netty监听的端口的连接就绪事件
  • Worker线程组负责具体的读写工作

-----

Dubbo是基于RPC调用的,也需要了解RPC的通信原理

3. RPC原理

一次完整的RPC调用过程需要经历以下步骤

  1. 服务消费方以本地调用的方式,调用接口的方法(不是真正调用)
  2. 消费方RPC代理接收到调用请求后,将调用的方法、传入的参数封装(序列化)成可以进行网络通信的消息体
  3. 消费方RPC代理找到服务地址,并将消息发送给服务提供方
  4. 提供方RPC代理收到消息后进行消息解码
  5. 提供方RPC代理根据解码后的消息,调用实际的本地服务(真正调用)
  6. 本地服务执行后将结果返回给提供方RPC代理
  7. 提供方RPC代理将返回的结果封装(序列化)成消息体,传送给消费方RPC代理
  8. 消费方RPC代理收到服务调用结果后进行消息解码
  9. 服务消费方真正得到消息结果

对于实际开发中,只需要使用第1步和第9步,RPC屏蔽掉了2-8的步骤

4. Dubbo框架设计

官方文档:http://dubbo.apache.org/zh-cn/docs/dev/design.html

4.1 Business部分

在Business部分,只有一个层面:Service

对于Service层,只是提供一个ServiceInterface,再在Provider中写对应的ServiceImpl即可

也就是说,对于应用而言,到这里就足以了

下面的部分全部都是Dubbo的底层原理

4.2 RPC部分

自上往下,依次有:

  • config 配置层:对外配置接口
    • 它是封装配置文件中的配置信息
    • 以ServiceConfig, ReferenceConfig为中心
    • 可以直接初始化配置类,也可以通过Spring解析配置生成配置类
  • proxy 服务代理层:服务接口透明代理,生成服务的客户端Stub和服务器端Skeleton
    • 它实际上就是辅助生成RPC的代理对象
    • 以ServiceProx为中心,扩展接口为ProxyFactory
  • registry 注册中心层:封装服务地址的注册与发现
    • 这一层就是注册中心的核心功能层(服务发现、服务注册)
    • 以服务URL为中心,扩展接口为RegistryFactory, Registry, RegistryService
  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心
    • 这一层的调用者Invoker可以保证多台服务器的服务调用,以及实现负载均衡
    • 以Invoker为中心,扩展接口为Cluster, Directory, Router, LoadBalance
  • monitor 监控层:RPC调用次数和调用时间监控
    • 以Statistics为中心,扩展接口为MonitorFactory, Monitor, MonitorService
  • protocol 远程调用层:封装RPC调用
    • 这一层是RPC的调用核心
    • 以Invocation, Result为中心,扩展接口为Protocol, Invoker, Exporter

4.3 Remoting部分

自上往下,依次有:

  • exchange 信息交换层:封装请求响应模式,同步转异步
    • 这一层负责给服务提供方与服务消费方之间架起连接的管道
    • 以Request, Response为中心,扩展接口为Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
  • transport 网络传输层:抽象mina和netty为统一接口
    • 这一层负责真正的网络数据传输,Netty也就在这一层被封装
    • 以Message为中心,扩展接口为Channel, Transporter, Client, Server, Codec
  • serialize 数据序列化层:可复用的一些工具
    • 扩展接口为Serialization, ObjectInput, ObjectOutput, ThreadPool

5. Dubbo启动流程

因为Dubbo的配置文件实际是Spring的配置文件

而Spring解析配置文件,最终都是回归到一个接口:BeanDefinitionParser

那就意味着,Dubbo肯定也提供了对应的BeanDefinitionParser:DubboBeanDefinitionParser

在BeanDefinitionParser中只定义了一个方法:

BeanDefinition parse(Element element, ParserContext parserContext);

下面剖析IOC容器启动时标签的解析机制

5.1 Dubbo的标签解析机制

5.1.1 进入parse方法

可以看到parse方法仅有一句:

public BeanDefinition parse(Element element, ParserContext parserContext) {
    return parse(element, parserContext, beanClass, required);
}

注意这里的beanClass是成员变量而不是参数!

那难道说,每次解析标签,都是一个全新的DubboBeanDefinitionParser?

5.1.2 自行声明的parse方法

由于该方法太长,只取关键部分

if (ProtocolConfig.class.equals(beanClass)) {
    for (String name : parserContext.getRegistry().getBeanDefinitionNames()) {
        BeanDefinition definition = parserContext.getRegistry().getBeanDefinition(name);
        PropertyValue property = definition.getPropertyValues().getPropertyValue("protocol");
        if (property != null) {
            Object value = property.getValue();
            if (value instanceof ProtocolConfig && id.equals(((ProtocolConfig) value).getName())) {
                definition.getPropertyValues().addPropertyValue("protocol", new RuntimeBeanReference(id));
            }
        }
    }
} else if (ServiceBean.class.equals(beanClass)) {
    String className = element.getAttribute("class");
    if (className != null && className.length() > 0) {
        RootBeanDefinition classDefinition = new RootBeanDefinition();
        classDefinition.setBeanClass(ReflectUtils.forName(className));
        classDefinition.setLazyInit(false);
        parseProperties(element.getChildNodes(), classDefinition);
        beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
    }
} else if (ProviderConfig.class.equals(beanClass)) {
    parseNested(element, parserContext, ServiceBean.class, true, "service", "provider", id, beanDefinition);
} else if (ConsumerConfig.class.equals(beanClass)) {
    parseNested(element, parserContext, ReferenceBean.class, false, "reference", "consumer", id, beanDefinition);
}

这一部分在判断beanClass的类型!

可Dubbo的配置文件中并没有写class=…之类的内容,它怎么知道的?

那既然不知道beanClass从哪儿来的,就需要追到构造方法中了

5.1.3 DubboBeanDefinitionParser的构造方法

public DubboBeanDefinitionParser(Class<?> beanClass, boolean required) {
    this.beanClass = beanClass;
    this.required = required;
}

构造方法很简单,但这个Class<?>的参数从哪儿来的?

5.1.4 追溯到创建DubboBeanDefinitionParser的前一步

通过Debug可以发现,在这一步之前,还有一个调用过程,是调用了一个叫DubboNamespaceHandler的init方法

public void init() {
    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
    registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
    registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
    registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
    registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
    registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
    registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
    registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
    registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
    registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}

从这个方法中,可以看到,这是把Dubbo配置文件中可能出现的标签,全部转化为DubboBeanDefinitionParser了

也就是说,并不是解析一个标签就创建一个DubboBeanDefinitionParser,而是把所有可能出现的标签全部穷举了,创建出对应的DubboBeanDefinitionParser,这样遇到哪个标签,就使用哪个标签的解析器而已

而且注意到每次传入的Class对象,要么是xxxConfig,要么是xxxBean

  • xxxConfig是标签中的配置信息,只能出现一次
  • xxxBean是发布的服务/引用的服务,每组标签都会创建一个Bean

对于xxxConfig,只是把配置封装到对应的Config类里就可以了

但提供方发布的服务、消费方的服务引用,是另外一种机制

5.2 Dubbo的服务发布机制

所有的服务都被封装为ServiceBean,而ServiceBean类的定义信息比较复杂

public class ServiceBean<T> extends ServiceConfig<T> 
        implements InitializingBean, DisposableBean, ApplicationContextAware, 
                   ApplicationListener<ContextRefreshedEvent>, BeanNameAware

这个ServiceBean实现了几个接口:

  • InitializingBean:实现该接口后,Spring会在该Bean创建完毕后回调afterPropertiesSet方法做后置处理
  • ApplicationListener<ContextRefreshedEvent>:实现该接口后,当Spring的IOC容器初始化完毕后,会回调onApplicationEvent方法

5.2.1 实现InitializingBean的afterPropertiesSet方法

原源码太长,取部分源码:

public void afterPropertiesSet() throws Exception {
        if (getProvider() == null) {
            Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
            if (providerConfigMap != null && providerConfigMap.size() > 0) {
                Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
                if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
                        && providerConfigMap.size() > 1) { // backward compatibility
                    List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
                    for (ProviderConfig config : providerConfigMap.values()) {
                        if (config.isDefault() != null && config.isDefault().booleanValue()) {
                            providerConfigs.add(config);
                        }
                    }
                    if (!providerConfigs.isEmpty()) {
                        setProviders(providerConfigs);
                    }
                } else {
                    ProviderConfig providerConfig = null;
                    for (ProviderConfig config : providerConfigMap.values()) {
                        if (config.isDefault() == null || config.isDefault().booleanValue()) {
                            if (providerConfig != null) {
                                throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
                            }
                            providerConfig = config;
                        }
                    }
                    if (providerConfig != null) {
                        setProvider(providerConfig);
                    }
                }
            }
        }
        if (getApplication() == null
                && (getProvider() == null || getProvider().getApplication() == null)) {
            Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
            if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
                ApplicationConfig applicationConfig = null;
                for (ApplicationConfig config : applicationConfigMap.values()) {
                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
                        if (applicationConfig != null) {
                            throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
                        }
                        applicationConfig = config;
                    }
                }
                if (applicationConfig != null) {
                    setApplication(applicationConfig);
                }
            }
        }
        //……………(已忽略)
        if (getPath() == null || getPath().length() == 0) {
            if (beanName != null && beanName.length() > 0
                    && getInterface() != null && getInterface().length() > 0
                    && beanName.startsWith(getInterface())) {
                setPath(beanName);
            }
        }
        if (!isDelay()) {
            export();
        }
    }

在每一个主干if结构中,都能看到最后的一句话:

if (providerConfig!= null) {…}

如果判定成功,会把此前创建的那些Dubbo的配置标签内容都存到这些ServiceBean中

5.2.2 实现ApplicationListener的onApplicationEvent方法

    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

它调用了一个export方法

5.2.3 【关键】回调的export方法:暴露服务

该方法来源于父类ServiceConfig

    public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }

        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                @Override
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }

前面还是一些判断,关键的步骤在下面的起线程调用doExport方法

5.2.4 doExport方法

    protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("Already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
        }
        checkDefault();
        if (provider != null) {
            if (application == null) {
                application = provider.getApplication();
            }
            if (module == null) {
                module = provider.getModule();
            }
            if (registries == null) {
                registries = provider.getRegistries();
            }
            if (monitor == null) {
                monitor = provider.getMonitor();
            }
            if (protocols == null) {
                protocols = provider.getProtocols();
            }
        }
        if (module != null) {
            if (registries == null) {
                registries = module.getRegistries();
            }
            if (monitor == null) {
                monitor = module.getMonitor();
            }
        }
        if (application != null) {
            if (registries == null) {
                registries = application.getRegistries();
            }
            if (monitor == null) {
                monitor = application.getMonitor();
            }
        }
        if (ref instanceof GenericService) {
            interfaceClass = GenericService.class;
            if (StringUtils.isEmpty(generic)) {
                generic = Boolean.TRUE.toString();
            }
        } else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            checkInterfaceAndMethods(interfaceClass, methods);
            checkRef();
            generic = Boolean.FALSE.toString();
        }
        if (local != null) {
            if ("true".equals(local)) {
                local = interfaceName + "Local";
            }
            Class<?> localClass;
            try {
                localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if (!interfaceClass.isAssignableFrom(localClass)) {
                throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
            }
        }
        if (stub != null) {
            if ("true".equals(stub)) {
                stub = interfaceName + "Stub";
            }
            Class<?> stubClass;
            try {
                stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if (!interfaceClass.isAssignableFrom(stubClass)) {
                throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
            }
        }
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStubAndMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        doExportUrls();
        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
    }

还是好多好多的判断、检查,到最后执行了一个doExportUrls方法

5.2.5 doExportUrls方法

    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

这个方法就是暴露服务的URL地址的!

这个方法先加载了注册中心的URL,之后for循环了一组protocols

这组protocols实际上是Dubbo配置文件中配置的protocol标签

之后它拿到这个protocol配置和注册中心的URL,执行了一个doExportUrlsFor1Protocol方法

5.2.6 doExportUrlsFor1Protocol方法

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }

        Map<String, String> map = new HashMap<String, String>();
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        if (methods != null && !methods.isEmpty()) {
            for (MethodConfig method : methods) {
                appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                if (arguments != null && !arguments.isEmpty()) {
                    for (ArgumentConfig argument : arguments) {
                        // convert argument type
                        if (argument.getType() != null && argument.getType().length() > 0) {
                            Method[] methods = interfaceClass.getMethods();
                            // visit all methods
                            if (methods != null && methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // target the method, and get its signature
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        // one callback in the method
                                        if (argument.getIndex() != -1) {
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        } else if (argument.getIndex() != -1) {
                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }

        if (ProtocolUtils.isGeneric(generic)) {
            map.put(Constants.GENERIC_KEY, generic);
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
            } else {
                map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(Constants.TOKEN_KEY, token);
            }
        }
        if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }
        // export service
        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }

        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                        }

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

前面的大量内容还是做配置,关键的暴露部分在最后的几行

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, 
                         registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);

先是拿到代理工厂,去获取了一个Invoker执行器,从代码层面也可以看出来,它其实是利用代理技术,将接口对应的实现类对象(ref),与该服务的暴露地址等信息封装到一个新的代理对象中,这个对象就是Invoker执行器。

之后又调用了protocol的export方法

这个protocol对象,在上面可以看到它是使用了如下方法:

  • ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

这个方法其实是用的Java的SPI技术

5.2.6.1 【扩展】Java的SPI技术

关于Java中原生的SPI技术,可以参照博客

https://www.cnblogs.com/zhongkaiuu/articles/5040971.html

5.2.6.2 【扩展】Dubbo的SPI技术

Dubbo没有使用jdk原生的SPI技术,而是自己另写了一套

它的ExtensionLoader就可以类比于jdk的ServiceLoader,它也是用来加载指定路径下的接口实现

更详细的Dubbo-SPI,可以参照博客

https://blog.csdn.net/qiangcai/article/details/77750541

5.2.7 protocol的export方法

在Dubbo发布服务时,由于涉及到注册中心和服务本身,也就意味着需要找Protocol的两个实现类,去调用他们的export方法

通过查找Protocol的实现类,可以发现有两个需要使用的实现类:

RegistryProtocol,DubboProtocol

实际在进行Debug的时候,发现先进RegistryProtocol

5.2.8 RegistryProtocol的export方法

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        URL registryUrl = getRegistryUrl(originInvoker);

        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

        //to judge to delay publish whether or not
        boolean register = registeredProviderUrl.getParameter("register", true);

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

        if (register) {
            register(registryUrl, registeredProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }

这个方法传入的参数就是上面的那个服务接口的实现类的代理对象

这里面有两个关键的步骤:doLocalExport和ProviderConsumerRegTable.registerProvider

按照执行顺序,先执行doLocalExport方法

5.2.9 doLocalExport方法

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

在这段中,一开始这个exporter是空的,需要使用双检锁来构造一个exporter(懒汉单例模式的思路)

而创建的时候,是先构造了一个invokerDelegete,这个对象中包含两份invoker,都是该服务的实现类代理

之后又调用了这个RegistryProtocol中聚合的另一个protocol的export方法

这个protocol就是上面提到的DubboProtocol

5.2.10 DubboProtocol的export方法

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

其实这个DubboProtocol不是指这个Dubbo框架,而是指发布服务的协议使用的是dubbo协议

此外,从上面的Protocol的实现类来看,还有http的,Hessian的,WebService的,等等等

所以这里的Dubbo并不是指框架,而是dubbo协议

这个方法一开始先从Invoker中拿到了url,之后封装成一个DubboExporter对象

之后进行一部分if检查后来到了另外一个关键的方法:openServer

5.2.11 【关键】openServer方法

   private void openServer(URL url) {
       // find server.
       String key = url.getAddress();
       //client can export a service which's only for server to invoke
       boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
       if (isServer) {
           ExchangeServer server = serverMap.get(key);
           if (server == null) {
               serverMap.put(key, createServer(url));
           } else {
               // server supports reset, use together with override
               server.reset(url);
           }
       }
   }

在这里,因为一开始是没有ExchangeServer的,需要调用createServer方法

5.2.12 createServer方法

    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

在创建ExchangeServer时,核心是try-catch中的那一句Exchanges.bind方法,要对url和requestHandler进行绑定

5.2.13 【Netty】Exchanges.bind方法

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }

    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

从这里开始,就一步一步导向Netty了

因为这里前面都是检查,到最后return的时候,又是调用了另一个Exchanger的bind方法

这部分过程还是使用了SPI技术

使用IDE引导查看这个bind方法,会来到另外一个Exchanger:HeaderExchanger

5.2.13.1 HeaderExchanger的bind方法

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

在这个方法中,又是调用了Transporters的静态方法bind

5.2.13.2 Transporters的bind方法

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

可以发现这里面已经出现ChannelHandler了,也就是NIO相关的东西了!

最后一步还是bind,不过这次不再是Exchanger的bind,而是Transporter的bind

而这个Transporter,不再是某一个固定的实现类了,而是几个实现类中的一个

也就是这个地方,出现了使用Netty

这也就说明了Dubbo的RPC底层使用的是Netty

到这儿,openServer方法就执行完毕了

这段源码实际做的事情,是Netty底层启动,并监听服务暴露的地址

5.2.14 回到DubboProtocol的export方法

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // ………

        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

数据交换服务器启动后,将url序列化一下,整段export方法就执行完毕了

5.2.15 回到RegistryProtocol的export方法

前面提到了export方法中第二个关键方法是ProviderConsumerRegTable.registerProvider,进入到该方法:

5.2.16 ProviderConsumerRegTable.registerProvider方法

    public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
    public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();

    public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) {
        ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl);
        String serviceUniqueName = providerUrl.getServiceKey();
        Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
        if (invokers == null) {
            providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ProviderInvokerWrapper>());
            invokers = providerInvokers.get(serviceUniqueName);
        }
        invokers.add(wrapperInvoker);
    }

注意这个类的名:提供方-消费方的注册表,也就是说,这个类会保存服务提供方和消费方的调用地址和代理对象

可以看到在ProviderConsumerRegTable类中保存了两个Map,这两个Map实际上是保存了服务URL以及执行这个服务的Invoker(这些Invoker里面有被代理的真正的服务实现类对象)

等上述方法完全执行完毕后,Dubbo服务发布完成。

5.3 Dubbo的服务消费机制

所有的服务引用都被封装为ReferenceBean,而ReferenceBean类的定义信息比较复杂

public class ReferenceBean<T> extends ReferenceConfig<T> 
        implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean

这个ReferenceBean实现了几个接口:

  • FactoryBean:Spring的工厂Bean
  • InitializingBean:实现该接口后,Spring会在该Bean创建完毕后回调afterPropertiesSet方法做后置处理

5.3.1 实现FactoryBean的效果

因为ReferenceBean实现了该接口,所以在Controller引用这些Service时,由于Spring要完成自动注入,所以要调用这个ReferenceBean的getObject方法

5.3.2 调用getObject方法初始化服务引用

    public Object getObject() throws Exception {
        return get();
    }

    //该方法来源于父类ReferenceConfig
    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }

它调用了父类的get方法,而在父类ReferenceConfig中定义了init方法

5.3.3 init方法初始化服务引用

    private void init() {
        if (initialized) {
            return;
        }
        initialized = true;
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
        }
        // get consumer's global configuration
        checkDefault();
        appendProperties(this);
        if (getGeneric() == null && getConsumer() != null) {
            setGeneric(getConsumer().getGeneric());
        }
        if (ProtocolUtils.isGeneric(getGeneric())) {
            interfaceClass = GenericService.class;
        } else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            checkInterfaceAndMethods(interfaceClass, methods);
        }
        // ………………

        //attributes are stored by system context.
        StaticContext.getSystemContext().putAll(attributes);
        ref = createProxy(map);
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }

前面还是跟服务发布一样,大量的检查判断

到最后有一句:createProxy,它负责创建服务的引用代理

5.3.4 【关键】createProxy方法

    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        if (isInjvm() == null) {
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            isJvmRefer = isInjvm().booleanValue();
        }

        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
                List<URL> us = loadRegistries(false);
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }

这个方法传入了一个Map,里面存了注册中心的相关配置、服务的相关信息(接口名、方法名、超时时间等)

到中间,有一句最关键的:

invoker = refprotocol.refer(interfaceClass, urls.get(0));

它要使用refProtocol去注册中心远程引用目标服务接口的代理引用(url的内容就是注册中心的地址)

而这个Protocol又是跟上面一样,先进RegistryProtocol,再进DubboProtocol

5.3.5 RegistryProtocol的refer方法

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

与上面调用export类似,最关键的方法还是以do开头的:doRefer方法

5.3.6 doRefer方法

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

在整个方法调用中,最核心的是去注册中心订阅服务

也就是说,这一步就是负责去订阅服务提供者暴露的服务

当执行cluster.join方法时,会跳转到DubboProtocol的refer方法中(SPI)

5.3.7 DubboProtocol的refer方法

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

与服务发布类似,不过在这里创建的是DubboInvoker,而这里面最关键的是那一句getClients

5.3.8 【关键】getClients方法

    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

这个方法跟上面的openServer类似

而这些客户端从哪里来,需要调用getSharedClient或initClient

5.3.9 getSharedClient方法与initClient方法

    private ExchangeClient getSharedClient(URL url) {
        String key = url.getAddress();
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            if (!client.isClosed()) {
                client.incrementAndGetCount();
                return client;
            } else {
                referenceClientMap.remove(key);
            }
        }

        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
            if (referenceClientMap.containsKey(key)) {
                return referenceClientMap.get(key);
            }

            ExchangeClient exchangeClient = initClient(url);
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            referenceClientMap.put(key, client);
            ghostClientMap.remove(key);
            locks.remove(key);
            return client;
        }
    }

    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue.
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // connection should be lazy
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

可以发现,getSharedClient方法最终也是调用的initClient方法,所以核心还是在initClient方法中 前面还是值的检验、判空等逻辑

在最后的try-catch结构中,针对是否为延迟连接,有两种策略:

  • if中是延迟连接,在应用启动时不连接到注册中心
  • else中是非延迟连接,应用启动时就与注册中心建立连接

由于大多属配置都是预先连接,故继续看Exchangers的connect方法

5.3.10 【Netty】Exchangers.connect方法

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }

在最后一步,看到了与上面bind方法极为相似的源码

这也就意味着,这里面的底层也是Netty

5.3.11 回到RegistryProtocol的refer方法

服务订阅完成后,返回的还是Invoker,里面存的是两份directory,每一份directory中存放着服务的信息、客户端的信息等

最后一步,又是那个注册表,这次它把服务消费方也注册进去了(不再贴源码)

最终回到init方法,初始化完毕。


6. Dubbo的服务调用流程

Dubbo的调用链在官方文档中也有描述:http://dubbo.apache.org/zh-cn/docs/dev/design.html

以源码解析为标准,分析调用过程

xxxService.xxxMethod();

当远程调用方法时,经历了如下过程

6.1 代理对象调用服务

上述的xxxService其实是一个代理对象,它基于InvokerInvocationHandler

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

可以发现,在除了上面的几个特殊的方法,剩下的都是统一的调用方案:

invoker.invoke(new RpcInvocation(method, args)).recreate();

在调用之前,要先把调用的方法和参数统一封装成一个RpcInvocation对象

之后再调用invoke方法,它传到了MockClusterInvoker类的invoke方法

(MockClusterInvoker是做本地伪装,意为如果建立连接后服务器宕机,客户端不抛异常,而采取抛null)

(有关本地伪装的配置可参照官方文档:http://dubbo.apache.org/zh-cn/docs/user/demos/local-mock.html

6.2 MockClusterInvoker的invoke

    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            //no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                    }
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }

在这里面,第一个if结构里又一个invoker.invoke方法,这个invoker实际上是FailoverClusterInvoker(集群容错与负载均衡的Invoker)

这就应该意识到这是层层包装的结构了,类似于拦截器机制

再往里走,因为FailoverClusterInvoker有父类AbstractClusterInvoker,它重写了invoke方法(FailoverClusterInvoker并没有重写)

6.3 AbstractClusterInvoker的invoke方法

    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;

        // binding attachments into invocation.
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }

        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }

在中间,它要调用list方法来获取一组Invoker,这个方法是去注册中心找有多少个能远程调用该服务的可用服务Invoker

——| 其实list方法里只有一句:directory.list(invocation);

之后的if结构是负责处理负载均衡的

最后return的时候又去调doInvoke方法

由于在AbstractClusterInvoker中该方法为模板方法,根据多态,又回到FailoverClusterInvoker中调用

6.4 FailoverClusterInvoker的doInvoke方法

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException{
        List<Invoker<T>> copyinvokers = invokers;
        checkInvokers(copyinvokers, invocation);
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                // check again
                checkInvokers(copyinvokers, invocation);
            }
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

在循环的try-catch中,又是invoker.invoke方法

但这里面不再是ClusterInvoker了,根据调用流程图,可以发现,接下来是一组Filter

把这组Filter走完之后,最终负责执行的是DubboInvoker

6.5 DubboInvoker的doInvoke方法

    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

第一组if-else中,需要拿到当前请求的客户端,之后再try-catch的最后一个return中执行request方法

这个方法就是真正请求远程调用的方法

6.6 【Netty】client.request方法

这个request经历了两次跳转,最终来到HeaderExchangeChannel类的request方法

//ReferenceCountExchangeClient
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        return client.request(request, timeout);
    }

//HeaderExchangeClient
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        return channel.request(request, timeout);
    }

//HeaderExchangeChannel
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

在最后的try-catch中,用channel把请求发送出去

而这个Channel,就是底层调用Netty的Channel进行通信调用了。

最终获取到结果后,判断一次是否超时,如果没有超时,完整的远程服务调用就完成了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

LinkedBear的个人空间

0 篇文章14 人订阅

扫码关注云+社区

领取腾讯云代金券