专栏首页shysh95Dubbo注册中心之Zookeeper(续)

Dubbo注册中心之Zookeeper(续)

本文主要讲解ZookeeperRegistry是如何实现注册和订阅功能。

上图是Dubbo注册中心的整体类图,基于接口的实现方式可以方便我们扩展注册中心的实现方式,下面简单介绍一下各个类的作用:

  1. RegistryService:定义了注册中心基本功能的接口类(包含注册和订阅)
  2. Registry:继承Node和RegistryService接口(从Java 8开始支持一个接口继承多个父接口)
  3. AbstractRegistry:抽象类,主要提供了对注册中心数据的文件缓存
  4. FailbackRegistry:抽象类,主要提供了注册失败或者订阅失败的重试机制
  5. ZookeeperRegistry:具体实现类,基于Zookeeper的注册中心
ZookeeperRegistry

下面我们看一下ZookeeperRegistry的具体实现。

注册
@Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

注册的逻辑很简单,就是简单在ZooKeeper中创建一个节点,默认创建的节点是一个临时节点(随会话的关闭自动删除)。

创建的节点路径格式为:{root}/{interface}/{category}/{protocol}://{username}:{password}@{host}:{port}/{path}?parameterKey=parameterValue&...

整个节点路径格式分为两大部分,一部分是父目录节点{root}/{interface}/{category},另一部分就是节点的名称{protocol}://{username}:{password}@{host}:{port}/{path}?parameterKey=parameterValue&...

# 第二部分示例
dubbo://127.0.0.1:20880/cn.sh.dubbo.user.service.UserService?anyhost=true&application=user-manager&application.version=1.0&bean.name=cn.sh.dubbo.user.service.UserService&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=cn.sh.dubbo.user.service.UserService&methods=login,register&pid=3551&qos.accept.foreign.ip=false&qos.port=22223&qosEnable=true&register=true&release=2.7.2&side=provider&timestamp=1563954893720
# 解释其中几个参数的来源
# bean.name:<dubbo:service>标签配置的id属性的值

下面讲解一下每个{}中的数据来源和默认值:

  1. root :默认值dubbo,可以通过<dubbo:registry>标签中的group属性进行指定
  2. interface :暴露的服务接口全量限定名称
  3. category :类别,默认值是providers,代表这是一条注册URL
  4. protocol :协议名称,默认值是dubbo,还可以支持rmi、hessian、http、webservice、thrift、memcached、redis和rest
  5. username:password :用户名和密码
  6. host:port :主机名(或者IP)和端口号
  7. path :默认为暴露的服务接口全量限定名称,也可以通过<dubbo:service>的path属性指定
取消注册
@Override
    public void doUnregister(URL url) {
        try {
            zkClient.delete(toUrlPath(url));
        } catch (Throwable e) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

取消注册就是将注册时创建的节点进行删除。

订阅
@Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

订阅支持全局订阅和订阅具体某个接口,上述订阅的方法只展示了订阅某个接口的逻辑。简单描述一下其逻辑:

  1. 首先将URL转换为订阅的路径,然后查询缓存中是否存有该URL的订阅对象,如果没有,则新建一个缓存对象
  2. 紧接着从缓存对象根据NotifyListener获取其对应的Zookeeper的ChildListener,如果没有的话则新建一个ChildListener并将其放入缓存,并且ChildListener的实现其实就是调用了ZookeeperRegistry的notify方法
  3. 在创建完ChildListener以后,紧接着创建订阅的路径,这边是为了防止在订阅时该路径所代表的的节点还没创建,然后再订阅该路径的子节点变化
  4. 如果发现点阅的节点下面有子节点的话,进行一次通知,告知订阅者的所订阅的路径有多少个子节点,看一下notify的方法具体的实现
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
          // 省略了url、listener、urls的判断
        // keep every provider's category.
        Map<String, List<URL>> result = new HashMap<>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            listener.notify(categoryList);
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            saveProperties(url);
        }
    }

上述方法有三个参数,第一个参数为要URL(包含订阅的类型、订阅的服务等关键信息),第二个NotifyListener在订阅服务这一模块的具体实现类是RegistryDirectory,第三个是订阅的路径的子节点路径列表。整体逻辑如下:

  1. 判断一下子节点路径列表是不是订阅者真正关心的内容,如果是关心的内容,就按照订阅的类型进行区分
  2. 在进行校验完成以后,按照类别进行通知,并将通知的类别进行缓存,下面看一下NotifyListener怎么通知订阅者信息变更的
@Override
    public synchronized void notify(List<URL> urls) {
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(url -> {
                    if (UrlUtils.isConfigurator(url)) {
                        return CONFIGURATORS_CATEGORY;
                    } else if (UrlUtils.isRoute(url)) {
                        return ROUTERS_CATEGORY;
                    } else if (UrlUtils.isProvider(url)) {
                        return PROVIDERS_CATEGORY;
                    }
                    return "";
                }));

        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        toRouters(routerURLs).ifPresent(this::addRouters);

        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        refreshOverrideAndInvoker(providerURLs);
    }

从上面的源码中我们可以大体看出,根据URL解析每个类别(configurators、routers、providers),分别进行通知。

取消订阅
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
    if (listeners != null) {
        ChildListener zkListener = listeners.get(listener);
        if (zkListener != null) {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                zkClient.removeChildListener(root, zkListener);
            } else {
                for (String path : toCategoriesPath(url)) {
                    zkClient.removeChildListener(path, zkListener);
                }
            }
        }
    }
}

取消订阅的逻辑就比较简单了,将相应的ChildListener进行移除就好了。

上述内容只是本人的一些个人见解,最后附上一张Dubbo的架构图(来源:官网),上述内容只是简略的讲述了红框中的部分内容。

本文分享自微信公众号 - shysh95(shysh95),作者:shysh95

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-07-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Zookeeper Java API

    本文主要讲解使用Java API来和Zookeeper集群进行交互,大家在看完这篇文章以后一定要亲自动手去敲代码(纸上得来终觉浅,绝知此事要躬行)。下面介绍的A...

    shysh95
  • ZkClient使用

    使用原生的Zookeeper API来与Zookeeper服务端进行交互还是比较繁琐复杂的,为了简化这些操作,就诞生了一些封装客户端。这些客户端除了简单易用,还...

    shysh95
  • Java Class文件常量池

    Class文件的存在使得不同语言编写的程序都可以运行在Java虚拟机上,只需要这些语言经过编译器编译后的Class文件符合Java虚拟机定义的规范,Java虚拟...

    shysh95
  • 小程序给我们带来了哪些改变

    以前你的手机里可能安装了很多APP,有些APP根本就不常使用,但如果卸载了又可能用得上,重复安装显然很麻烦。所以你只能留着这些使用频率低的APP,任由它们占据手...

    中微信通
  • 微信文章太多,看不过来?这个「女生」来念给你听

    对于大多数人来说,打开微信收藏界面,里面或多或少都会躺着一些,因一时感兴趣而收藏着的,但是却再也没有打开过的微信文章。

    知晓君
  • 小程序会占用手机存储空间吗?| 小程序问答 #21

    首先,微信团队限制了小程序的体积:只有 2 MB 以内的小程序,才能提交到微信审核。

    知晓君
  • 一篇文章读懂微信小程序 | 观点

    知晓君
  • 小程序一周报 | 小程序游戏类目开放测试 / 朋友圈小程序广告全量上线

    极乐君
  • 微信小程序助力实体店对接互联网市场

    互联网+的概念正在逐渐的深入,实体商家此次转战互联网市场,对接移动端红利,除了开发APP,入驻个大类平台之外,开发微信小程序将是实体店商家一个更好的选择。 ? ...

    企鹅号小编
  • 小程序——实体行业探索的新方向!

    有没有发觉你的手机微信群里时不时就收到一个微信小程序?然而小程序不是一下子就被大家所熟知的,任何新生事物,其影响都是一步步扩大而来的。当年我们还不清楚什么是公众...

    用户1745481

扫码关注云+社区

领取腾讯云代金券