专栏首页开发架构二三事dubbo源码之Proxy、Transporter和Exchanger执行过程

dubbo源码之Proxy、Transporter和Exchanger执行过程

对于dubbo的provider和consumer,通过不同的配置项会进行不同的操作,provider会对应创建ExchangeServer,而consumer端对应创建ExchangeClient。而根据实现的不同server和client也用对应的方式工作,比如dubbo默认使用的实现是nettyServer和nettyClient。这一切加载流程是如何工作的呢?这个问题就是本文要解决的重点。

一般配置项:

provider:
<dubbo:application name="appA" />
<!--使用zookeeper注册中心暴露服务地址 --><dubbo:registry address="${zookeeper.url}" ></dubbo:registry><dubbo:protocol name="dubbo" port="-1" />
<dubbo:provider filter="MDCFilter,DubboExceptionFilter,-exception" delay="-1" timeout="7000" retries="0" /><!-- 让监控中心进行统计 --><dubbo:monitor protocol="registry" />
<!-- 使用注解方式暴露接口 -->     <dubbo:annotation package="com.user"/>
consumer: <dubbo:application name="appB"></dubbo:application><!--    注意这里的traceFilterc必须放在HystrixFilter前--><dubbo:consumer filter="HystrixFilter,MDCFilter,DubboExceptionFilter,-exception" timeout="10000"                check="false"/>
<!-- 使用注解方式暴露接口 缺损package时,默认扫描全包 --><dubbo:annotation/>
<dubbo:registry address="${zookeeper.url}"></dubbo:registry>

provider对应的是com.alibaba.dubbo.config.ProviderConfig中的属性值; consumer对应的是com.alibaba.dubbo.config.ConsumerConfig中的属性值。

1. 解析入口(DubboNamespaceHandler):

解析部分com.alibaba.dubbo.config.spring.schema.DubboBeanDefinitionParser#parse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, java.lang.Class, boolean):

从上面代码来看,整个流程就相当清晰了,DubboNamespaceHandler中加载的顺序是(这里只关注provider、consumer和service)先ProviderConfig再ConsumerConfig,再ProtocolConfig,之后才是ServiceBean。在解析完provider,consumer和protocol之后还会把这些节点当成serviceBean继续进行相应的解析,也就是说真正的解析过程都是在serviceBean中。包括provider和consumer中配置的filter属性也是会当做ServiceBean进行解析,解析时对应的protocol是:

在provider、consumer、registry、filter等作为ServiceBean来解析和进行相应的操作时,对应的ServiceBean的protocol属性是不一样的。这里需要提一点的是filter对应的是ProtocolFilterWrapper,而Consumer和Provider对应的是DubboProtocol

需要注意的是com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper中包裹的属性private final Protocol protocol;在这里对应的是DubboProtocol。

2. dubbo的ServiceBean(注意找ServiceConfig中解析的部分与xml配置项标签的对应关系):

  • 接下来我们重点分析下:com.alibaba.dubbo.config.ServiceConfig#doExportUrls:
   private void doExportUrls() {        List<URL> registryURLs = loadRegistries(true);        for (ProtocolConfig protocolConfig : protocols) {//使用的协议,这里以dubbo协议为例            doExportUrlsFor1Protocol(protocolConfig, registryURLs);        }    }
  • 它的com.alibaba.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol方法:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {    ...    String scope = url.getParameter(Constants.SCOPE_KEY);            //配置为none不暴露            if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
                //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)                if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {                    //本地                    exportLocal(url);                }                //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)                if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){                    if (logger.isInfoEnabled()) {                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);                    }                    if (registryURLs != null && registryURLs.size() > 0                            && url.getParameter("register", true)) {                        for (URL registryURL : registryURLs) {                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));                            URL monitorUrl = loadMonitor(registryURL);                            if (monitorUrl != null) {                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());                            }                            if (logger.isInfoEnabled()) {                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);                            }                            //暴露时先获取invoker                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                            Exporter<?> exporter = protocol.export(invoker);                            exporters.add(exporter);                        }                    } else {                        //暴露时先获取invoker                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                        Exporter<?> exporter = protocol.export(invoker);                        exporters.add(exporter);                    }                }            }            this.urls.add(url);
}
exportLocal方法:private void exportLocal(URL url) {        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {            URL local = URL.valueOf(url.toFullString())                    .setProtocol(Constants.LOCAL_PROTOCOL)                    .setHost(NetUtils.LOCALHOST)                    .setPort(0);
            // modified by lishen            ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));            //暴露时先获取invoker            Exporter<?> exporter = protocol.export(                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));            exporters.add(exporter);            logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");        }    }

在进行服务暴露时,先获取invoker,然后获取exporter,并将exporter添加到ServiceConfig的List<exporter</exporter

关于暴露url成Service部分这里不去深究了,这里dubbo会把每个url暴露成服务(Service),在dubbo控制台上看时就可以看到每个服务对应一大串的url。

3. server部分的初始化

  • 我们接着来看看上面源码中的proxyFactory和invoker部分: proxyFactory的类继承结构图为:

这里我们主要看下JdkProxyFactory:

 public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));    }
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {        return new AbstractProxyInvoker<T>(proxy, type, url) {            @Override            protected Object doInvoke(T proxy, String methodName,                                       Class<?>[] parameterTypes,                                       Object[] arguments) throws Throwable {                Method method = proxy.getClass().getMethod(methodName, parameterTypes);                return method.invoke(proxy, arguments);            }        };    }

主要用于生成接口实现类的代理对象,并生成一个携带doInvoke方法的AbstractProxyInvoker。

  • 对于protocol.export我们主要看一下com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#export: DubboProtocol里的方法如下:

可见它包括创建Client和创建Server的相关方法,其中它的export方法如下:

图中标红的部分为openServer方法,也就是provider创建server的部分:

private void openServer(URL url) {        // find server.        String key = url.getAddress();        //client 也可以暴露一个只有server可以调用的服务。        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);        if (isServer) {            ExchangeServer server = serverMap.get(key);            if (server == null) {                serverMap.put(key, createServer(url));            } else {                //server支持reset,配合override功能使用                server.reset(url);            }        }    }
    private ExchangeServer createServer(URL url) {        //默认开启server关闭时发送readonly事件        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());        //默认开启heartbeat        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));        //public static final String  DEFAULT_REMOTING_SERVER            = "netty";        //这里默认使用的是netty        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);        //这里默认使用的是NettyTransporter        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);        ExchangeServer server;        try {            server = Exchangers.bind(url, requestHandler);        } catch (RemotingException e) {            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);        }        str = url.getParameter(Constants.CLIENT_KEY);        if (str != null && str.length() > 0) {            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();            if (!supportedTypes.contains(str)) {                throw new RpcException("Unsupported client type: " + str);            }        }        return server;    }
  • openServer为创建ExchangeServer的方法,其中传入的是createServer方法产生的一个ExchangeServer,这个ExchangeServer是一个装饰器模式装饰过的对象
  • 由上面代码可以知道,dubboProtocol默认使用的是nettyTransporter:
  • ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)是走SPI拓展机制的,可以是自定义的,但默认的是nettyTransporter。
  • 生成的是将NettyServer进行装饰过的ExchangeServer,对应代码:
 try {    server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {    throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}

com.alibaba.dubbo.remoting.exchange.Exchangers#bind(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.remoting.exchange.ExchangeHandler):

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {    if (url == null) {        throw new IllegalArgumentException("url == null");    }    if (handler == null) {        throw new IllegalArgumentException("handler == null");    }    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");    return getExchanger(url).bind(url, handler);}
public static Exchanger getExchanger(URL url) {        //com.alibaba.dubbo.common.Constants#DEFAULT_EXCHANGER        // public static final String  DEFAULT_EXCHANGER                  = "header";        // 默认用的echanger是header        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);    return getExchanger(type);}
public static Exchanger getExchanger(String type) {    //这里是加载Exchanger的spi拓展点信息    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);}

加载Exchanger并通过type进行匹配,默认是使用的是HeaderExchanger

最终getExchanger(url).bind(url, handler)对应的方法是:

最终是将传入的ExchangeHandler装饰成HeaderExchangeHandler,然后将HeaderExchangeHandler装饰成DecodeHandler。

这里可以看到,server也是装饰器模式,最终工作的是NettyServer。到这里Server的初始化流程也就结束了。

4. client端的初始化

文章开头部分我们知道com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper中包裹的属性private final Protocol protocol;在这里对应的是DubboProtocol。com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper#refer方法:

 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {            return protocol.refer(type, url);        }        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);    }

可以看到它调用了protocol.refer方法,那么com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#refer方法如下:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // modified by lishen    optimizeSerialization(url);
    // create rpc invoker.    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);    invokers.add(invoker);    return invoker;}

在这里面会调用com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#getClients方法:

private ExchangeClient[] getClients(URL url){    //是否共享连接    boolean service_share_connect = false;    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);    //如果connections不配置,则共享连接,否则每服务每连接    if (connections == 0){        service_share_connect = true;        connections = 1;    }
    ExchangeClient[] clients = new ExchangeClient[connections];    for (int i = 0; i < clients.length; i++) {        if (service_share_connect){            clients[i] = getSharedClient(url);        } else {            clients[i] = initClient(url);        }    }    return clients;}

继续看initClient:

接下来看client = Exchangers.connect(url ,requestHandler);的实现:

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {    if (url == null) {        throw new IllegalArgumentException("url == null");    }    if (handler == null) {        throw new IllegalArgumentException("handler == null");    }    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");    return getExchanger(url).connect(url, handler);}

其中getExchanger方法部分与server部分相同,我们看看它的connect方法的实现:

这里最后返回的是一个HeaderExchangeClient,它的内部装饰的是NettyClient。到这里client端的初始化也全部结束了。

5. 总结

本文以DubboNamespaceHandler为入口对Server和Client的初始化以及初始化过程中涉及到的Proxy、Transporter和Exchanger进行了一个流程梳理。

本文分享自微信公众号 - 开发架构二三事(gh_d6f166e26398),作者:两个小灰象

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-07-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • elasticsearch之Roaring Bitmaps的结构

    如果你是刚刚接触搜索引擎,你可能会感到奇怪,构建搜索引擎中存储块的一个很重要的原因是搜索引擎能够有效地压缩和快速解码有序的数字集合。 为什么这个很有用?你可能知...

    开发架构二三事
  • nginx之location指令

    localtion可以由前缀字符串或正则表达式定义。正则表达式使用前面的“〜*”修饰符(不区分大小写匹配)或“〜”修饰符(用于区分大小写匹配)指定。要找到匹配给...

    开发架构二三事
  • golang defer关键字的使用

    在golang当中,defer代码块会在函数调用链表中增加一个函数调用。这个函数调用不是普通的函数调用,而是会在函数正常返回,也就是return之后添加一个函数...

    开发架构二三事
  • Dubbo——服务发布原理

    在使用Dubbo的时候你一定会好奇它是怎么实现RPC的,而要了解它的调用过程,必然需要先了解其服务发布/订阅的过程,本篇将详细讨论Dubbo的发布过程。

    夜勿语
  • 多线程爬取 unsplash 图库

    我公众号文章的封面配图都在 Unsplash 上找的。因为 Unsplash 是一个完全免费的、无版权的高清图片资源网站。

    猴哥yuri
  • 几十行代码批量下载高清壁纸 爬虫入门实战

    电影桌面背景时间长了也会腻,换背景的话一般去网上下载又需要挑来挑去,一页一页的翻很烦人,能不能一次展示很多图片呢?省的一页一页看。比较简单的方法就是把图片存到本...

    啤酒单恋小龙虾
  • Python爬虫爬取美剧网站

    一直有爱看美剧的习惯,一方面锻炼一下英语听力,一方面打发一下时间。之前是能在视频网站上面在线看的,可是自从广电总局的限制令之后,进口的美剧英剧等貌似就不在像以前...

    小小科
  • Python之京东商品图片爬虫

    京东是我们购物经常去光顾的一个点上平台,它里面的商品多种多样,其中的商品图片也是应有尽有,今天小编呢就给大家带来一个京东商品图片的简单爬虫。

    用户6825444
  • python爬虫实战:爬取美剧网站

    Python现在非常火,语法简单而且功能强大,很多同学都想学Python!所以小的给各位看官们准备了高价值Python学习视频教程及相关电子版书籍,都放在了文章...

    python学习教程
  • 美剧迷是如何使用Python的

    一直有爱看美剧的习惯,一方面锻炼一下英语听力,一方面打发一下时间。之前是能在视频网站上面在线看的,可是自从广电总局的限制令之后,进口的美剧英剧等貌似就不在像以前...

    IT派

扫码关注云+社区

领取腾讯云代金券