前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >dubbo学习(六)服务发布-dubbo服务在zk的创建、订阅

dubbo学习(六)服务发布-dubbo服务在zk的创建、订阅

作者头像
虞大大
发布2020-10-28 17:13:58
1.3K0
发布2020-10-28 17:13:58
举报
文章被收录于专栏:码云大作战码云大作战

一、服务端provider发布流程回顾

根据dubbo启动日志,provider的发布动作为以下几个步骤:

(1)暴露本地服务

Export dubbo service com.ywl.dubbo.TestApi to local registry, dubbo version: 2.0.0, current host: 127.0.0.1。

(2)暴露远程服务

Export dubbo service com.ywl.dubbo.TestApi to url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi...后面省略。

(3)启动netty

Start NettyClient yuwenlei.local/192.168.24.69 connect to the server /192.168.1.100:20041, dubbo version: 2.0.0, current host: 192.168.24.69。

(4)打开zk

Opening socket connection to server dailyzk.webuy.ai/192.168.49.11:2181。

(5)注册provider服务到zk

Register dubbo service com.ywl.dubbo.TestApi url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi? ...中间省略。

to registry registry://dailyzk.webuy.ai:7005/org.apache.dubbo.registry.RegistryService? ...后面省略。

(6)监听zk(订阅与通知)

Subscribe: provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。

Notify urls for subscribe url provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。

· 服务发布的目的

解析dubbo-provider.xml中的接口。将服务提供者向注册中心注册服务,以便服务消费者从注册中心查询并调用服务。

代码语言:javascript
复制
<dubbo:service interface="com.ywl.dubbo.TestApi" ref="testApi" retries="0"
               cluster="failfast" timeout="3000"/>

上篇文章已经提到zookeeper是如何被初始化与连接的,这一篇主要分析下dubbo服务是如何在zookeeper上进行节点创建与他们的订阅关系。

二、dubbo节点如何创建在zookeeper

创建dubbo节点是建立在远程服务暴露的源码基础上:

代码语言:javascript
复制
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //暴露远程服务
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    URL registryUrl = getRegistryUrl(originInvoker);
    //初始化注册信息、连接zk
代码语言:javascript
复制
    final Registry registry = getRegistry(originInvoker);
    //注册服务 - 即在注册中心创建dubbo服务节点
    register(registryUrl, registeredProviderUrl);    //...}
代码语言:javascript
复制
public void register(URL registryUrl, URL registedProviderUrl) {    //获取注册信息
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registedProviderUrl);
}

先获取注册信息,由于在之前的代码中已经对注册信息初始化过,因此直接会获取缓存中的注册信息,进行dubbo节点创建。

创建dubbo节点核心代码:

代码语言:javascript
复制
public void register(URL url) {    //往注册队列中添加需要注册的服务
    super.register(url);
    //删除注册异常和未注册队列中的服务    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        doRegister(url);
    } catch (Exception e) {
        //出现异常,则添加到注册异常队列中
        failedRegistered.add(url);
    }
}//path为服务端api路径与参数信息
//ehemeral为是否持久化 - 默认为true
代码语言:javascript
复制
public void create(String path, boolean ephemeral) {
    //...
    if (ephemeral) {        //创建临时节点
        createEphemeral(path)    } else {        //创建持久节点        createPersistent(path);
    }
}

上述的path即服务端api路径与注册信息:

以上为dubbo的服务端节点创建过程,将注册服务放入到注册队列,最后通过注册对象创建节点,创建临时节点,注册完毕。

· createEphemeral与createPersistent

createEphemeral表示临时节点,他与客户端会话绑定,一但服务端服务被关闭或会话失效,那么这个客户端所创建的临时节点都会被删除。

createPersistent表示持久化节点,表示服务节点一但被创建,除非触发主动删除,否则一直存储在ZK中。

因此服务端服务如果被关闭,那么所创建的节点没有必要继续存在zk中,否则客户端还会不断来进行订阅,因此在dubbo服务节点的创建中,采用了临时节点的创建来处理。

但是,对于dubbo服务中的providers/configuration/routes等信息会被作为持久化节点来创建。具体节点信息如下图所示。

三、dubbo服务的订阅

dubbo服务的订阅是建立在远程服务暴露的源码基础上:

代码语言:javascript
复制
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //暴露远程服务
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    URL registryUrl = getRegistryUrl(originInvoker);
    //初始化注册信息、连接zk
代码语言:javascript
复制
    final Registry registry = getRegistry(originInvoker);
    //注册服务 - 即在注册中心创建dubbo服务节点
    register(registryUrl, registeredProviderUrl);    //...    //订阅
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    return new DestroyableExporter<T>(exporter, originInvoker,                    overrideSubscribeUrl, registeredProviderUrl);}

订阅相关代码:

代码语言:javascript
复制
public void subscribe(URL url, NotifyListener listener) {    //将订阅服务对象 加入到订阅队列中
    super.subscribe(url, listener);
    //移除订阅失败队列    removeFailedSubscribed(url, listener);
    try {        //...
        //向服务器发送订阅请求
        doSubscribe(url, listener);
    } catch (Exception e) {        //...
        //出现异常则添加到订阅失败队列中
        addFailedSubscribed(url, listener);
    }
}
代码语言:javascript
复制
protected void doSubscribe(final URL url, final NotifyListener listener) {
        //...
} else {            //子结点数据集合
代码语言:javascript
复制
            List<URL> urls = new ArrayList<URL>();
            //将url拆分对应的监听集合,如router/configuration/provider
            for (String path : toCategoriesPath(url)) {                //监听集合中获取
                ConcurrentMap<NotifyListener, ChildListener> listeners                            = zkListeners.get(url);
                //获取不到,则重新new一个,放到监听集合中                if (listeners == null) {
                zkListeners.putIfAbsent(url,                    new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }                //从集合中,获取监听订阅对象
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {                    //获取不到 创建一个监听
                listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                        //订阅配置信息 执行notify方法,主要用于收到订阅后的处理                          notify(url, listener,                            toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }                //创建一个订阅的节点 - 为持久化节点
                zkClient.create(path, false);
                //加入到监听队列中                List<String> children = zkClient.addChildListener(path,                                                 zkListener);                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }            //全量数据dubbo服务订阅处理
            notify(url, listener, urls);
代码语言:javascript
复制
        }
   //...
}

加入到监听队列中的方法-主要用于收到订阅后的处理,如:删除节点、修改节点、添加子节点。

以上为dubbo服务的订阅,总结主要分为三个步骤:

(1)创建持久化dubbo配置节点,即/dubbo/com.ywl.dubbo.TestApi/configurators或routes或providers。

(2)加入订阅/dubbo/com.ywl.dubbo.TestApi/configurators或routes或providers。

(3)收到订阅后的处理。

四、收到订阅后的处理-notify

代码语言:javascript
复制
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    //...    //已经通知过的列表
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {        //对订阅的结点-configuration\providers等进行遍历
        String category = entry.getKey()        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        //把服务端的注册url信息更新到本地dubbo缓存中
        saveProperties(url);
        listener.notify(categoryList);
    }
}

listener.notify的核心代码:

代码语言:javascript
复制
public synchronized void notify(List<URL> urls) {    //分别存放三种类型的集合
List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category)
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    //更新服务端configuration配置信息
    if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    //更新缓存的路由配置信息
    if (routerUrls != null && !routerUrls.isEmpty()) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { 
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && !localConfigurators.isEmpty()) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    //重建invoker实例
    refreshInvoker(invokerUrls);
}

notify方法主要是对订阅的服务端的configuration、routes配置信息进行更新,最后重新生成服务提供api的invoker实例,执行完毕。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-10-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码云大作战 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档