前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo 源码 v2.7 分析:核心机制(二)

dubbo 源码 v2.7 分析:核心机制(二)

原创
作者头像
程序员架构进阶
修改2021-03-03 09:35:07
2600
修改2021-03-03 09:35:07
举报
文章被收录于专栏:架构进阶架构进阶

系列文章

dubbo 源码 v2.7 分析:结构、container 入口及线程模型

dubbo 源码 v2.7 分析:SPI 机制

dubbo 源码 v2.7 分析:核心机制(一)

关注公众号:程序员架构进阶,每天实时获取更新,上百份面试资料和其他福利等你拿~

一 概述

上一篇重点讲了dubbo中的几种设计模式,和对应的源码。本篇会继续介绍Bean加载、Extension、代理几种机制在dubbo中的应用。

二 Bean加载

dubbo中,bean的加载是基于Spring的可扩展Schema机制。

2.1 Spring的可扩展Schema

在大多数场景,如果我们需要为系统提供可配置化支持,简单的做法是直接基于 Spring的标准 Bean 来配置;但如果配置较为复杂或者需要更多丰富控制,这种简单粗暴的方法会显得非常笨拙。一般的做法会用原生态的方式去解析定义好的 xml 文件,然后转化为配置对象,这种方式当然可以解决所有问题,但实现起来比较繁琐,特别是是在配置非常复杂的时候,解析工作是一个不得不考虑的负担。Spring 提供了可扩展 Schema,配置步骤如下:

2.1.1 设计配置属性和JavaBean

2.1.2 编写XSD文件

Spring用xsd文件校验xml文件格式。校验方法:Spring默认在启动时是要加载XSD文件来验证xml文件的,所以如果有的时候断网了,或者一些开源软件切换域名,那么就很容易碰到应用启动不了。为了防止这种情况,Spring提供了一种机制,默认从本地加载XSD文件。

2.1.3 编写NamespaceHandler和BeanDefinitionParser

完成解析工作,会用到NamespaceHandler 和 BeanDefinitionParser。NamespaceHandler 会根据 schema 和节点名找到某个 BeanDefinitionParser,然后由BeanDefinitionParser 完成具体的解析工作。因此需要分别完成 NamespaceHandler 和BeanDefinitionParser 的实现类,Spring 提供了默认实现类 NamespaceHandlerSupport 和AbstractSingleBeanDefinitionParser,简单的方式就是去继承这两个类。

2.1.4 编写spring.handlers和spring.schemas

放在META-INF文件夹,串联所有部件,让应用感知到。这两个文件需要开发者编写并放入 META-INF 文件夹中,这两个文件的地址必须是 META-INF/spring.handlers 和 META-INF/spring.schemas,spring 会默认去载入这两个文件。

2.1.5 在Bean文件中应用

与配置一个普通的Spring Bean类似,只是需要注意使用的是自定义的Schema。配置好后,再通过Spring容器的getBean(beanName)方法来使用自定义的bean。

2.2 dubbo中的实现方式

在dubbo的META-INF下,可以看到包含了很多文件,其中spring.handlers和spring.schemas就是上面提到的两个串联配置文件:

spring.handles:

代码语言:javascript
复制
http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler

spring.schemas:

代码语言:javascript
复制
http\://dubbo.apache.org/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/compat/dubbo.xsd

三 Extension机制

即扩展点机制。

3.1 扩展点配置

3.1.1 根据关键字读取配置文件,获得具体的实现类

dubbo-demo/dubbo-demo-xml是dubbo源码的xml示例,其中dubbo-provider.xml的配置:

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <dubbo:application name="demo-provider" metadata-type="remote">
        <dubbo:parameter key="mapping-type" value="metadata"/>
    </dubbo:application>

    <dubbo:config-center address="zookeeper://127.0.0.1:2181"/>
    <dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
    <dubbo:registry id="registry1" address="zookeeper://127.0.0.1:2181"/>

    <dubbo:protocol name="dubbo" port="-1"/>

    <bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
    <bean id="greetingService" class="org.apache.dubbo.demo.provider.GreetingServiceImpl"/>

    <dubbo:service interface="org.apache.dubbo.demo.DemoService" timeout="3000" ref="demoService" registry="registry1"/>
    <dubbo:service version="1.0.0" group="greeting" timeout="5000" interface="org.apache.dubbo.demo.GreetingService"
                   ref="greetingService"/>

</beans>

其中,<dubbo:service interface="org.apache.dubbo.demo.DemoService" timeout="3000" ref="demoService" registry="registry1"/> 这段配置就会根据registry去获取指定的Service。

3.1.2 注解@SPI和@Adaptive

@SPI注解:

在Protocol接口中,定义默认协议为dubbo,就是通过@SPI("dubbo")来实现的:

代码语言:javascript
复制
@SPI("dubbo")
public interface Protocol {

    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    void destroy();

}

@Adaptive注解:

这个注解打在接口方法上,通过ExtensionLoader.getAdaptiveExtension()方法获取设配类,会先通过前面的过程生成Java的源代码,再通过编译器编译成class加载。但Compiler的实现策略选择,也是通过ExtensionLoader.getAdaptiveExtension(),如果也通过编译器编译成class会导致死循环?

再分析一下ExtensionLoader.getAdaptiveExtension(),对于实现类上标记了注解@Adaptive的dubbo spi扩展机制,它获取设配类不是生成设配类的Java源代码,而是在读取扩展文件的时候,遇到实现类打了@Adaptive就把这个类作为设配类缓存在ExtensionLoader中,调用时直接返回。

3.1.3 filter和listener

在生成具体的实现类对象时,不是直接读取类文件,而是在读取类文件的基础上,通过filter和listener去封装类对象。

四 代理

4.1 代理生成方式

大家熟知的应该有JDK、cglib这两种方式,在Spring框架中做了这两种动态代理的支持。不过除此之外,Java中海油Javassist库动态代理、Javassist库动态字节码代理。代理之间的区别可参见Java动态代理机制详解(JDK 和CGLIB,Javassist,ASM)

4.2 dubbo中的代理

在dubbo的rpc->proxy包下,我们可以看到javassist、jdk、wrapper这三个包。

其中几个重点类,JdkProxyFactory,JavassistProxyFactory这两个代理工厂类是继承AbstractProxyFactory抽象类,而AbstractProxyFactory这个抽象类又实现了ProxyFactory接口,StubProxyFactoryWrapper则是对ProxyFactory的直接实现。类之间关系如下:

4.3 Invoke调用

先上一张图:

ReferenceConfig类比较重要的是init()方法,在这里做了大量工作。我们关注的是ref = createProxy(map);再继续向下跟进,这个方法会创建一个service proxy并返回:

代码语言:javascript
复制
return (T) PROXY_FACTORY.getProxy(invoker);

这里的PROXY_FACTORY也是通过扩展点机制获取的:

代码语言:javascript
复制
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

关于ProxyFactory的配置,会涉及到META-INF/dubbo.internal/org.apache.dubbo.rpc.ProxyFactory这个文件,内容为:

代码语言:javascript
复制
stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory

这里指定了我们前面提到的三种ProxyFactory实现类。PROXY_FACTORY默认走的是JdkProxyFactory,所以PROXY_FACTORY.getProxy(invoker);实际使用的方法就是下面的这段(AbstractProxyFactory):

代码语言:javascript
复制
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

在InvokerInvocationHandler类,阐述了代理的具体工作方式。贴出源码:

代码语言:javascript
复制

/**
 * InvokerHandler
 */
public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }

        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

这里最重要的是最后一句,调用了Invoker接口(实现类)的invoke方法。根据关系Invoker=>AbstractInvoker=>DubboInvoker,上面的invoker.invoke()使用的是AbstractInvoker的invoke:

代码语言:javascript
复制
@Override
    public Result invoke(Invocation inv) throws RpcException {
        // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
        if (destroyed.get()) {
            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (CollectionUtils.isNotEmptyMap(attachment)) {
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            /**
             * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
             * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
             * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
             * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
             */
            invocation.addAttachments(contextAttachments);
        }

        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            return doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        }
    }

DubboInvoker的doInvoke方法:

代码语言:javascript
复制
 @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                responseFuture.whenComplete((obj, t) -> {
                    if (t != null) {
                        asyncRpcResult.completeExceptionally(t);
                    } else {
                        asyncRpcResult.complete((AppResponse) obj);
                    }
                });
                RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
                return asyncRpcResult;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

在这里,invoke函数最终会转为网络调用。

RpcInvocation的构造函数中,包含了客户端传递给invoker的信息:

代码语言:javascript
复制
public RpcInvocation(Invocation invocation, Invoker<?> invoker) {
        this(invocation.getMethodName(), invocation.getParameterTypes(),
                invocation.getArguments(), new HashMap<String, String>(invocation.getAttachments()),
                invocation.getInvoker());
        if (invoker != null) {
            URL url = invoker.getUrl();
            setAttachment(PATH_KEY, url.getPath());
            if (url.hasParameter(INTERFACE_KEY)) {
                setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
            }
            if (url.hasParameter(GROUP_KEY)) {
                setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
            }
            if (url.hasParameter(VERSION_KEY)) {
                setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY, "0.0.0"));
            }
            if (url.hasParameter(TIMEOUT_KEY)) {
                setAttachment(TIMEOUT_KEY, url.getParameter(TIMEOUT_KEY));
            }
            if (url.hasParameter(TOKEN_KEY)) {
                setAttachment(TOKEN_KEY, url.getParameter(TOKEN_KEY));
            }
            if (url.hasParameter(APPLICATION_KEY)) {
                setAttachment(APPLICATION_KEY, url.getParameter(APPLICATION_KEY));
            }
        }
    }

至此,我们理清了invoker.invode的调用过程。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 系列文章
  • 一 概述
  • 二 Bean加载
    • 2.1 Spring的可扩展Schema
      • 2.1.1 设计配置属性和JavaBean
      • 2.1.2 编写XSD文件
      • 2.1.3 编写NamespaceHandler和BeanDefinitionParser
      • 2.1.4 编写spring.handlers和spring.schemas
      • 2.1.5 在Bean文件中应用
    • 2.2 dubbo中的实现方式
    • 三 Extension机制
      • 3.1 扩展点配置
        • 3.1.1 根据关键字读取配置文件,获得具体的实现类
        • 3.1.2 注解@SPI和@Adaptive
        • 3.1.3 filter和listener
    • 四 代理
      • 4.1 代理生成方式
        • 4.2 dubbo中的代理
          • 4.3 Invoke调用
          相关产品与服务
          容器服务
          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档