专栏首页Java艺术Dubbo分层架构之服务注册中心层的源码分析(下)

Dubbo分层架构之服务注册中心层的源码分析(下)

基于Redis实现的注册中心为什么不被推荐使用,你知道原因吗?

由于我在实际项目中并未使用Redis作为服务注册中心,所以一直没有关注这个话题。那么,使用Redis作为服务注册中心有哪些缺点,希望本篇文章能给你答案。

首先,源码中使用了keys命令扫描以"/通信协议/接口名/"开头的key,所以,不要将业务使用的Redis集群作为注册中心,必须要独立部署一个Redis集群,否则会把项目搞挂。

其次,基于Redis实现的注册中心,是利用其发布/订阅的特性,每个接口的提供者、消费者都是使用一个hsah结构存储的,field为具体的提供者或消费者的url,而value则为过期时间。默认过期时间为当前时间加上60秒,因此需要开启一个定时任务定期的更新过期时间。在服务非正常下线时,服务提供者就没办法发送一个事件,因此只有当该Service其它提供者或消费者定时更新过期时间时向此key发布一个事件,消费者才能感知到。

综上,就是Redis注册中心不被推荐使用的原因。本篇文章继续分析基于Redis实现的注册中心其注册与订阅流程,只要了解服务注册需要做什么,以及订阅到事件之后要做什么,整个服务注册中心层我们就了解了,也能自己手写一个注册中心。

RegistryProtocol如何调度服务注册与订阅

继上篇,我们分析到了服务在导出(export)与引入(refer)时,由自适应扩展点机制取得RegistryProtocol,之后的事情就交由RegistryProtocol去完成了。

public class RegistryProtocol implements Protocol {
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    }
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    }
}

01

服务导出的流程图与引入的流程图在上篇文章已经给出,因此,我们直接分析代码,先分析服务导出(export),即RegistryProtocol的export方法。

RegistryProtocol的export方法做的事情大体分为4步。第一步是获取NotifyListener,这些NotifyListener将在注册中心有事件回调时被回调执行;第二步导出服务,即调用DubboProtocol的export方法,在此不做深入分析;第三步才是将服务注册到注册中心,判断是否需要注册到注册中心,可在配置文件中配置,比如本地Debug时不需要将服务注册到注册中心,此时可以配置为false;注册完成之后就是第四步,开始订阅。那参数originInvoker是什么?

Export方法的originInvoker参数包装了真实的Invoker与导出服务的元数据。Invoker就是能够被远程消费者调用的接口的封装(代理),它持有Service的实现类实例,如DemoServiceImpl。在上篇我们分析过,此时的Invoker的url最外层包上了注册中心的url,所以在export方法中可以根据Invoker拿到实际的注册中心的url以及服务提供者的url。

(RegistryProtocol的export传入的originInvoker参数的来源)

举个栗子来理解“Invoker的url最外层包装了注册中心的url”这句话,假设服务提供者的url为

dubbo://10.1.0.251:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-annotation-provider
&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService
&bind.ip=10.1.0.251&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=81839&register=true&release=&side=provider&timestamp=1576309881128

将服务提供者的url经过URLEncode后为

dubbo%3A%2F%2F10.1.0.251%3A20880%2Forg.apache.dubbo.demo.DemoService%3F
anyhost%3Dtrue%26application%3Ddubbo-demo-annotation-provider
%26bean.name%3DServiceBean%3Aorg.apache.dubbo.demo.DemoService%26bind.ip%3D10.1.0.251%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D81839%26register%3Dtrue%26release%3D%26side%3Dprovider%26timestamp%3D1576309881128&pid=81839&timestamp=1576309876113

注册中心的url为

regist://127.0.0.1:6379/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2

在调用RegistryProtocol的export方法之前,并不关心是使用何种注册中心,分层的设计各层有各层的职责,因此此时的注册中心的协议为“regist”,也因此Dubbo才能通过Protocol的自适应扩展点机制拿到RegistryProtocol。

registry=org.apache.dubbo.registry.integration.RegistryProtocol

将提供者的url包装上注册中心url之后,就变成

redis://127.0.0.1:6379/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2
&export=dubbo%3A%2F%2F10.1.0.251%3A20880%2........

偷梁换柱,绕来绕去,目的就是为了将两个url合并为一个url,让注册中心实现服务的导出与引入,非常巧妙的包装,这也是值得学习的地方。包装,其实就是代理,在Dubbo中用得非常多,比如将Invoker包了一层又一层,最后又巧妙的利用包装将Invoker变为Exporter,每层包装都为实现各层的职责,竟没想到,这种包装还能用到字符串上,佩服。

02

服务引入 (refer)与导出(export)是有很大区别的,服务导出似乎少了订阅的解析,是的,因为服务导出只关心配置的改变事件,在此我不打算往下分析。继承看RegistryProtocol的refer方法。

Refer方法有两个参数,第一个是引用的远程服务的接口类型,第二个与export方法的Invoker参数封装的url一样,都是包装上注册中心的。所以,第一步先是将url的"register://"替换为“redis://”,从注册器工厂中通过SPI获取到Redis注册中心的注册器RedisRegistry;接着就是从注册中心url中提取消费者将要注册到注册中心的url,解析url取得所有参数,根据group参数决定使用何种Cluster,关于Cluster本篇不深入分析。最后都是调用doRefer方法完成服务引入。

这里引入了一个新的类RegistryDirectory,每个接口(Service)对应一个Directory,持有注册中心的引用以及rpc协议(DubboProtocol)的引用,所以服务的订阅是委托给RegistryDirectory实现的。服务的引入依然是先将当前消费者注册到注册中心,再开始订阅。

public class RegistryDirectory<T> extends AbstractDirectory<T> 
     implements NotifyListener {
}

RegistryDirectory实际上也是一个NotifyListener,它需要订阅注册中心事件,更新自身缓存的服务提供者目录,既然RegistryDirectory是一个NotifyListener,那么RegistryProtocol干脆把订阅的实现交给它实现就行了。

上篇提到注册中心扩展契约,通过在服务提供者与服务消费者的url上添加category约定服务提供者只订阅配置改变事件,服务消费者订阅配置改变、路由改变、服务提供者改变事件,在源码截图的第三个框中,调用RegistryDirectory的subscribe委拖订阅的实现,就是给消费者的url加上category。

继续看RegistryDirectory的subscribe方法的实现。除了添加多几个监听器之外,就是调用注册中心注册器RegistryService的subscribe方法。

public void subscribe(URL url) {
        .......
        // 调用注册中心的订阅方法
        registry.subscribe(url, this);
}

订阅注册中心事件所注册的监听器NotifyListener是RegistryDirectory,在不考虑注册中心如何实现订阅功能之前,先看下当RegistryDirectory监听到事件时都做了什么。

 @Override
    public synchronized void notify(List<URL> urls) {
       // 先将url分类(category)
        Map<String, List<URL>> categoryUrls = urls....
        // configurator
        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
        // router
        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        toRouters(routerURLs).ifPresent(this::addRouters);
        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        refreshOverrideAndInvoker(providerURLs);
    }
    
    // 更新或添加新的服务提供者Invoker
    private void refreshOverrideAndInvoker(List<URL> urls) {
        // mock zookeeper://xxx?mock=return null
        overrideDirectoryUrl();
        refreshInvoker(urls);
    }

一句话概括就是更新配置、更新路由、更新服务提供者,具体不做分析。

RedisRegistry源码详细分析

通过前面的分析,我们知道,要添加一种注册中心的支持,我们要做的就是遵守契约,实现RegistryService接口的注册与订阅方法,其它的都不需要关心。而为了封装通用的逻辑,如提供注册失败重试功能,Dubbo提供了一个FailbackRegistry,因此,我们一般通过继承FailbackRegistry免去大量的工作,而只专注服务注册与订阅的实现。

FailbackRegistry其实就是通个一个定时器去延时重试注册,本篇不会去分析FailbackRegistry。RedisRegistry也不例外,它也是继承FailbackRegistry。

public class RedisRegistry extends FailbackRegistry {
}

RedisRegistry继承父类FailbackRegistry,因此它不是直接实现RegistryService的register、unregister、subscribe、unsubscribe方法,而是实现父类提供的与之对于的抽象方法doRegister、doUnregister、doSubscribe、doUnsubscribe。其中doUnregister、doUnsubscribe方法本篇不做分析。

RedisRegistry的构造方法初始化Redis的连接池,我们并不关心这些,但有个定时任务不能忽略。

 this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT);
 this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> 
            deferExpired(), 
            expirePeriod / 2, 
            expirePeriod / 2, TimeUnit.MILLISECONDS);

定时更新过期时间,过期时间默认为60s,需要在过期之前更新过期时间,所以定时任务每隔30s执行一次。也因如此,当有服务提供者下线时,在30s后消费者才能感知到。

private void deferExpired() {   
   for (URL url : new HashSet<>(getRegistered())) {
        ......
        // key===> 如:/dubbo/org.apache.dubbo.demo.DemoService/providers
        // filed ===> 当前服务提供者|消费者的url
        // value ===> 新的过期时间,当前时间+60s
        if (jedis.hset(key, url.toFullString(), 
               String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
               jedis.publish(key, REGISTER);
        }
    }
 }

deferExpired方法先更新自己的过期时间,然后发布一个事件,所以每30s,所有服务提供者以及消费者都能订阅到一个事件。订阅到事件之后的逻辑稍后分析。由于每个服务提供者、消费者都需要开启一个定时任务去更新过期时间,且每次更新过期时间都会发布一个事件,这将会导致本地目录RegistryDirectory频率更新,当服务提供者与消费者越多,缺陷就越明显。

01

先看doRegister是如何将服务提供者或者消费者注册到Redis的,将demo的服务提供者以及消费者先跑起来,再通过redis-cli工具连接到Redis,看Redis存储了什么信息,怎么存储。

有强迫症的读者可能会看着keys这个命令不爽,那接下来看源码你会更不爽。服务注册到Redis都是以hash结构存储,key为"rpc协议/服务接口名/提供者|消费者"。

服务提供者的存储,field为提供者的url,value为过期时间,也是定时任务每隔30s要更新的。消费者与提供者的存在都是一样,field为消费者的url,value为过期时间。

我明明只启动了一个消费者,但是图中却有两个消费者,这是由于我前一次启动时,未正常退出遗留下来的,所以,使用redis作为注册中心,最好自己起个定时任务去删除历史遗留的数据。

doRegister将服务注册到Redis就是一条hset命令,随后立即发布一个注册事件,要注意的是key,对于服务提供者而言,key为"/dubbo/org.apache.dubbo.demo.DemoService/provider",对于服务消费者而言,key为"/dubbo/org.apache.dubbo.demo.DemoService/consumer"。

02

注册的逻辑很简单,相比注册,订阅的实现逻辑稍复杂些。截图的代码会比较长,所以我直接贴代码,去掉一些不是很重要的逻辑。

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
   // service ===> /dubbo/{服务接口名}/{provider|consumer}
   String service = toServicePath(url);
   for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
       JedisPool jedisPool = entry.getValue();
       try (Jedis jedis = jedisPool.getResource()) {
         // service ==> /dubbo/org.apache.dubbo.demo.DemoService/
         // ANY_VALUE ==> *
         // jedis.keys 获取的是/dubbo/org.apache.dubbo.demo.DemoService/* ,所以结果包括了服务提供者和消费者的key
         doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
         break; // Just read one server's data
       }
   }
 }

首次订阅,通过keys命令获取与此Service(org.apache.dubbo.demo.DemoService)相关的服务提供者与消费者的key。模拟一次事件回调通知,主要目的是让消费者能获取当前所有可用的服务提供者,更新本地缓存目录。所以弊端,不能使用业务用到的Redis集群作为注册中心。

doSubscribe方法的参数url为服务提供者的url “provider://”或服务消费者的url “consumer://”。如果是服务消费者,第二个参数listener就是RegistryDirectory。

在服务提供者或是消费者启动时,该方法被调用一次,但是在这个方法中,我们并没有看到任何订阅的实现,只是调用doNotify方法模拟一次事件通知。这是因为我去掉了一部分逻辑。

 @Override
 public void doSubscribe(final URL url, final NotifyListener listener) {
        // service ===> /dubbo/{服务接口名}/{provider|consumer}
        String service = toServicePath(url);
        Notifier notifier = notifiers.get(service);
        if (notifier == null) {
            Notifier newNotifier = new Notifier(service);
            notifiers.putIfAbsent(service, newNotifier);
            notifier = notifiers.get(service);
            if (notifier == newNotifier) {
                //  开启线程
                notifier.start();
            }
        }
 }

Notifier是一个Thread,调用notifier.start()方法启动线程,在run方法中会往Redis注册一个订阅者。每个接口(Service)都开启一个线程。

// 如果是服务启动即为true
if (first) {
     // service ==> /dubbo/org.apache.dubbo.demo.DemoService
     first = false;
     doNotify(jedis, service);
}
// 添加订阅者,订阅key: /dubbo/org.apache.dubbo.demo.DemoService/*
jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE);

psubscribe命令订阅一个或多个符合给定模式的频道,此处不管是服务提供者还是消费者,key都是“/dubbo/org.apache.dubbo.demo.DemoService/*”,订阅到事件时处理逻辑在NotifySub。

除了服务首次注册到注册中心时,会发送一个注册事件外,还有一个定时更新过期时间的任务,每次更新过期时间后都会发布一个时间。doNotify方法比较多,我们拆分为两部分来看。

private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
   List<String> categories = Arrays.asList(url.getParameter(CATEGORY_KEY, new String[0]));
   // 获取服务接口(url可能是提供者,也可能是消费者)
   String consumerService = url.getServiceInterface();
   for (String key : keys) {
         // 从key中获取分类(/dubbo/{分类}),如果不存在url的category中跳过
         String category = toCategoryName(key);
         if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
              continue;
         }
         // 只有服务消费者会走到这里
         // ====【这里是第二部分代码】 =====
    }
    // 通知更新
    for (NotifyListener listener : listeners) {
            notify(url, listener, result);
    }
}

方法第二个参数keys,如果是服务提供者发布的事件,就是“/dubbo/org.apache.dubbo.demo.DemoService/provider”,如果是服务消费者发布的事件就是“/dubbo/org.apache.dubbo.demo.DemoService/consumer”,由于注册的订阅者订阅的key(channel)是“/dubbo/org.apache.dubbo.demo.DemoService/*”,所以不管服务提供者还是消费者都能感知到服务变更事件。

契约在for循环中实现,假设当前事件是某个服务消费者注册到注册中心后发布的一个注册事件,而当前服务是服务提供者,则不会做任何事情。而如果是消费者,则会进入到将要分析的第二部分代码。最后会回调所有监听器NotifyListener。

第二部分代码

 List<URL> urls = new ArrayList<>();
 // 获取所有服务(提供者|订阅者)
 Map<String, String> values = jedis.hgetAll(key);
 if (CollectionUtils.isNotEmptyMap(values)) {
      for (Map.Entry<String, String> entry : values.entrySet()) {
            URL u = URL.valueOf(entry.getKey());
            // 判断是否过期,过期说明服务掉线了
            if (!u.getParameter(DYNAMIC_KEY, true)
                || Long.parseLong(entry.getValue()) >= now) {
                    if (UrlUtils.isMatch(url, u)) {
                      // dubbo://10.1.0.164:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=81202&register=true&release=&side=provider&timestamp=1576237488706
                      // 将provider替换为dubbo(dubbo为通信协议)
                      urls.add(u);
           }
      }
 }

获取此事件的key下的所有注册信息,如获取所有服务提供者,并判断服务提供者是否过期,过滤掉过期的,获取到新增的,最后更新本地提供者目录缓存。

03

因为RegistryProtocol是通过SPI从配置文件中拿到注册器工厂RegistryFactory,再从RegistryFactory获取到注册器Registry的。除了提供一个Registry,还需要提供一个RegistryFactory,以及在Resources目录下添加一个org.apache.dubbo.registry.RegistryFactory配置文件。

[RedisRegistryFactory]

public class RedisRegistryFactory extends AbstractRegistryFactory {
    @Override
    protected Registry createRegistry(URL url) {
        return new RedisRegistry(url);
    }
}

[org.apache.dubbo.registry.RegistryFactory配置文件]

redis=org.apache.dubbo.registry.redis.RedisRegistryFactory

通过两篇文章的分析,想必大家已经能自己动手实现。实现一个注册中心并不难,但实现一个适用于生产环境的注册中心,我是觉得没必要重复造轮子。

本文分享自微信公众号 - Java艺术(javaskill),作者:wujiuye

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-12-15

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • OpenFeign与Ribbon源码分析总结(面试题)

    从以往的源码分析文章的阅读量不难看出,大家都不太喜欢看源码分析类文章,一是枯燥,二是难看懂(可能也是笔者写得不好吧,源码分析类文章确实难写),所以本篇就是总结前...

    Java艺术
  • 学生时代,我写过的那些“项目”

    今天,我看了一遍我在github上开源的“项目”,本来是想删掉这些开源demo的,因为github总是发邮件提醒,xx项目报出漏洞,让我修复,就是更新依赖的ja...

    Java艺术
  • 深入理解Dubbo源码(三),Dubbo与Spring的整合之注解方式

    昨晚熬夜学习spark,有点兴奋过头,最后看视频看着看着就睡着了,醒来发现自己还在沙发上。还是要注意休息,周末不能太放纵了。本来今天还是想继续玩spark的,想...

    Java艺术
  • 圆点点二维码的识别

    用户6021899
  • C#操作Url参数

    跟着阿笨一起玩NET
  • 利用数据接口,用python玩转新冠病毒疫情数据

    昨天在在凹凸数据公众号读者群中看到一个分享,标题非常非常感兴趣,于是晚上抽空研究了一下。

    朱小五
  • 干货 | 深度学习的实践应用之路

    AI科技评论按:本文由图普科技编译自《Applying Deep Learning to Real-world Problems》,AI科技评论独家首发。 近年...

    AI科技评论
  • Linux系统安装Nodejs

    通过  uname -a  命令查看到我的Linux系统位数是64位(备注:x86_64表示64位系统, i686 i386表示32位系统),如图

    周小董
  • 大规模异步新闻爬虫【2】:实现功能强大,简洁易用的网址池(URL Pool)

    对于比较大型的爬虫来说,URL管理的管理是个核心问题,管理不好,就可能重复下载,也可能遗漏下载。这里,我们设计一个URL池来管理URL。 这个URL池就是一个...

    一墨编程学习
  • Java学习笔记——dubbo服务之底层通讯协议Protocol

    我们先来找到通讯协议的入口点吧。通过Protocol接口查找通讯协议入口点,我们根据接口的export方法搜索发现入口了,在ServiceConfig的doEx...

    慕容千语

扫码关注云+社区

领取腾讯云代金券