前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo源码学习--服务发布(DubboProtocol、Exporter)

Dubbo源码学习--服务发布(DubboProtocol、Exporter)

作者头像
YGingko
发布2017-12-28 14:28:57
2.6K0
发布2017-12-28 14:28:57
举报
文章被收录于专栏:海说海说

Dubbo服务发布的整体流程一文中,只是分析了服务发布的整体流程,具体的细节还没有进一步分析。本节将继续分析服务暴露的过程。在ServiceConfig中通过一句话即可暴露服务,如下:

代码语言:javascript
复制
Exporter<?> exporter = protocol.export(invoker);

此时Invoker对象携带的URL信息中定义的是"registry",则此处"protocol"加载的是RegistryProtocol对象。也即调用RegistryProtocol的export方法处理Invoker。

RegistryProtocol实现了Protocol接口。Protocol接口是一个顶级接口,定义内容比较简单,如下:

代码语言:javascript
复制
package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;

@SPI("dubbo")
public interface Protocol {

    /**
     * 获取缺省端口,当用户没有配置端口时使用。
     *
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露远程服务:<br>
     * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
     * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
     * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
     *
     * @param <T>     服务的类型
     * @param invoker 服务的执行体
     * @return exporter 暴露服务的引用,用于取消暴露
     * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用远程服务:<br>
     * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
     * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
     * 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
     *
     * @param <T>  服务的类型
     * @param type 服务的类型
     * @param url  远程服务的URL地址
     * @return invoker 服务的本地代理
     * @throws RpcException 当连接服务提供方失败时抛出
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 释放协议:<br>
     * 1. 取消该协议所有已经暴露和引用的服务。<br>
     * 2. 释放协议所占用的所有资源,比如连接和端口。<br>
     * 3. 协议在释放后,依然能暴露和引用新的服务。<br>
     */
    void destroy();

}

通过中文注释,很容易理解接口中定义的四个方法的功能。通过SPI注解,设置默认的实现类为DubboProtocol。实现类RegistryProtocol中方法比较多,但大多数都是private方法,public方法主要是一些setter和getter方法,还有就是Protocol接口中定义的方法实现。

在这里主要看下RegistryProtocol中export方法:

代码语言:javascript
复制
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }

            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

export方法的入参是Invoker实例,并且定义了final类型,表明在方法内部,Invoker实例不能被再修改。方法中首先对传入的Invoker进行暴露,具体方法如下:

代码语言:javascript
复制
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

doLocalExport方法中先检查服务是否已经暴露过,如果已经暴露则不再重复暴露,如果没有暴露,则执行暴露过程。

代码语言:javascript
复制
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);

执行暴露的protocol是DubboProtocol实例。和RegistryProtocol一样,DubboProtocol也实现了Protocol接口,大多数方法也是private方法。不一样的是RegistryProtocol直接实现Protocol接口,而DubboProtocol是继承AbstractProtocol抽象类,AbstractProtocol实现Protocol接口。如下所示:

代码语言:javascript
复制
public class RegistryProtocol implements Protocol{}

public class DubboProtocol extends AbstractProtocol{}

public abstract class AbstractProtocol implements Protocol{}

直接来看DubboProtocol中export方法,源码如下:

代码语言:javascript
复制
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);

        return exporter;
    }

在发布服务之前,先根据要发布的服务产生与服务相对应的key值。key是一个字符串类型,格式为“group/ServiceName:version:port”。具体生成方法在ProtocolUtils工具类中定义了,如下所示:

代码语言:javascript
复制
    public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
        StringBuilder buf = new StringBuilder();
        if (serviceGroup != null && serviceGroup.length() > 0) {
            buf.append(serviceGroup);
            buf.append("/");
        }
        buf.append(serviceName);
        if (serviceVersion != null && serviceVersion.length() > 0 && !"0.0.0".equals(serviceVersion)) {
            buf.append(":");
            buf.append(serviceVersion);
        }
        buf.append(":");
        buf.append(port);
        return buf.toString();
    }

生成key值之后,结合Invoker和exportMap生成服务暴露,然后将生成的服务暴露作为value值放入map中,从而实现服务发布。服务暴露是用DubboExporter封装的,DubboExporter类比较简单,源码如下:

代码语言:javascript
复制
package com.alibaba.dubbo.rpc.protocol.dubbo;

import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.protocol.AbstractExporter;

import java.util.Map;

public class DubboExporter<T> extends AbstractExporter<T> {

    private final String key;

    private final Map<String, Exporter<?>> exporterMap;

    public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
    }

    @Override
    public void unexport() {
        super.unexport();
        exporterMap.remove(key);
    }

}

DubboExporter引入了泛型,继承了AbstractExporter抽象类,AbstractExporter又实现了Exporter接口。抽象类AbstractExporter和接口Exporter都比较简单,这里就不再叙述,请自行查阅。

DubboProtocol的export方法中需要重点看下exporterMap属性。exporterMap是在AbstractProtocol抽象类中定义的,如下:

代码语言:javascript
复制
    protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

exporterMap是Map类型,精确的说,是ConcurrentHashMap类型。ConcurrentHashMap是一个高并发的HashMap容器。发布的服务就存储在这里,通过key值可以得到服务发布DubboExporter,再根据DubboExporter的getInvoker方法得到服务调用,从而调用服务。具体的服务调用细节将在后续文章中分析,敬请期待。

以上就是Dubbo发布服务的整个过程,过程中各种封装、调用、继承、多态应用得淋漓尽致。大体的流程是:i. 应用Spring Schema机制读入provider配置文件信息;ii. 将读入后的信息组装成URL对象;iii. 根据组装后的URL对象创建服务调用Invoker;iv. 根据服务调用Invoker创建服务发布Exporter及相应key值;v. 将key值和Exporter装入ConcurrentHashMap中,实现发布服务功能。在这个过程中,Dubbo自定义的URL对象发挥着重要作用,将provider配置文件、Invoker、Exporter串联了起来。

分析完了Dubbo发布服务的过程,之后将继续分析Dubbo服务的注册过程

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-12-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档