前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo源码篇03---点点直连如何实现及背后原理

Dubbo源码篇03---点点直连如何实现及背后原理

作者头像
大忽悠爱学习
发布2023-05-23 10:09:58
3050
发布2023-05-23 10:09:58
举报
文章被收录于专栏:c++与qt学习

Dubbo源码篇03---从点点直连探究Complier编译的原理

什么是点点直连

Dubbo正常的请求模型,都是消费端从注册中心拉取服务提供者列表,然后采用适当的负载均衡策略,挑选出一个服务提供者URL,随机发起请求。

但是,Dubbo也给我们提供了一种方式,可以在没有注册中心的时候,直接使用提前设置好的URL发起请求,或者在有注册的中心的时候,绕过注册中心,使用设置好的URL发起请求,这种方式也被称为点点直连。

在这里插入图片描述
在这里插入图片描述

那么点点直连在实际项目开发过程中,究竟有没有用处呢?

下面我们跟随着实际需求的视角,具体来看看吧。


实际需求

在这里插入图片描述
在这里插入图片描述

订单系统这边由于入库订单的状态异常,导致该笔订单消息及时无法推送到供应商系统,从而阻碍了该笔订单在供应商侧的功能运转。

为了争取最短时间内恢复这笔订单的功能运转,我们需要尽快修改这条推送记录在数据库的状态,此时我们可能会想到以下几个做法:

  1. 通过编写update语句直接修改线上那条出现问题的记录,但是通常一家公司中的数据订正流程会很繁琐,耗时较长,并非这里的最佳选择
  2. 利用Web服务器后台的日志,重放一遍用户的请求,但是问题在于并不是所有场景都能根据重放用户请求解决,需要根据具体业务场景进行抉择
  3. 在深入理解JVM第三版一书中曾介绍过使用java类加载器提供的热更新能力实现动态调试线上服务的实现,我们能否借鉴这一思路,编写调用DAO层完成记录状态更新的代码,然后通过暴露出来的调试接口,将代码上传,然后利用类加载器提供的热更新技术动态加载类,然后调用调试方法完成订单状态更新呢?

如何实现动态编译?

Java代码从编译到执行的流程如下所示:

在这里插入图片描述
在这里插入图片描述

开发者编写的“Java 源代码”被编译后变成 class 字节码文件,然后字节码文件被 JVM 加载,直到变成可使用的类。

在这样的开发过程中,动态编译一般有两种方式:

  • 自主编码实现,比如通过 Runtime 调用 javac,或者通过 JavaCompile 调用 run。
  • 调用插件实现,比如使用市面上常用的 groovy-all.jar 插件。

出于简单性考虑,本文使用groovy插件实现java代码的动态编译。


如何发起调用?

由于需要将用于修复的代码上传到生产环境的机器上执行,因此每一个生产环境服务都需要对外暴露一个接口,用于接收动态调试请求:

在这里插入图片描述
在这里插入图片描述

由于修复代码需要上传到生产环境执行,因此为了避免引发不必要的产线事故,我们一般会拿某台机器节点做个小范围的验证,也就是说,这里需要用到一开始讲到的点点直连技术。

那么下一个问题就来了,如何实现点点直连呢?


点点直连原理

Dubbo在ReferenceConfig的父类ReferenceConfigBase类中提供了一个名为Url的字段:

代码语言:javascript
复制
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
    /**
     * The url for peer-to-peer invocation
     * 专为点到点连接而设计的
     */
    protected String url;
    ....

那么该字段的构成规则是怎样的呢? 又是如何起的作用的呢?

下面我们来简单追踪一下url被使用到的地方:

  • 当消费者端服务启动时,会为指定的服务接口创建一个代理,创建代理需要用到的客户端配置参数由ReferenceConfig负责提供,因此创建代理的动作也是在ReferenceConfig内部的createProxy方法内完成的
代码语言:javascript
复制
   //ReferenceConfig
   private T createProxy(Map<String, String> referenceParameters) {
        ...
          // 是否配置了客户端用户点点直连的Url
          if (StringUtils.isNotEmpty(url)) {
                //如果消费者端配置了url属性,那么dubbo会认为该rul是一个点对点地址,或者是一个注册中心的地址
                parseUrl(referenceParameters);
          } else {
                // dubbo走从注册中心拉取服务提供者url那套逻辑
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                    aggregateUrlFromRegistry(referenceParameters);
                }
         }
        createInvokerForRemote();
        ...
        URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
            referenceParameters.get(INTERFACE_KEY), referenceParameters);
        consumerUrl = consumerUrl.setScopeModel(getScopeModel());
        consumerUrl = consumerUrl.setServiceModel(consumerModel);
        MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());

        // create service proxy
        return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }
  • referenceParameters保存了客户端各种配置
在这里插入图片描述
在这里插入图片描述

parseUrl方法负责解析用户配置的点对点直连URL:

代码语言:javascript
复制
    //ReferenceConfig
    private void parseUrl(Map<String, String> referenceParameters) {
        //按照空格,;对消费者端设置的url进行切分,这里说明一个url属性中,我们可以通过空格或者;设置多个服务提供者的直连地址
        //或者指定一个或者多个专属的注册中心地址
        String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
        if (ArrayUtils.isNotEmpty(us)) {
            for (String u : us) {
                //解析当前url字符串,并解析为一个Dubbo提供的URL对象返回
                URL url = URL.valueOf(u);
                //url内部对象urlAddress对象的path属性--具体看下图
                if (StringUtils.isEmpty(url.getPath())) {
                    //大部分情况下我们不会指定path,因此一般path值默认为服务接口名
                    url = url.setPath(interfaceName);
                }
                ...
                //判断我们设置的是否是一个注册中心地址
                if (UrlUtils.isRegistry(url)) {
                    //添加进urls集合保存,并且使用REFER_KEY属性表明当前url代表的是注册中心地址
                    urls.add(url.putAttribute(REFER_KEY, referenceParameters));
                } else {
                   //将referenceParameters集合中的参数以&的形式拼接在当前url后面,类比http的请求参数url的拼接方式
                   //然后将拼接完整的url添加进urls集合
                    URL peerUrl = getScopeModel().getApplicationModel().getBeanFactory().getBean(ClusterUtils.class).mergeUrl(url, referenceParameters);
                    peerUrl = peerUrl.putAttribute(PEER_KEY, true);
                    urls.add(peerUrl);
                }
            }
        }
    }
  • URL.valueOf方法负责解析当前url字符串,并解析为一个Dubbo提供的URL对象返回,格式如下:
在这里插入图片描述
在这里插入图片描述
  • url.getPath方法返回的是Url内部的urlAddress对象的path属性
在这里插入图片描述
在这里插入图片描述
  • 不指定path,默认被设置为服务接口名的情况
在这里插入图片描述
在这里插入图片描述
  • peerUrl拼接得到的结果
代码语言:javascript
复制
dubbo://127.0.0.1:80/dubbo.dubboSpi.HelloService?application=generic-call-consumer&async=true&background=false&generic=true&interface=dubbo.dubboSpi.HelloService&param=value&pid=4600&register.ip=192.168.18.131&side=consumer&sticky=false&timeout=7000

从parseUrl方法逻辑可知,dubbo会将客户端各种配置参数以类似http请求参数的url拼接方式组织起来。


createInvokerForRemote方法负责构造发起请求调用的Invoker对象:

代码语言:javascript
复制
    private void createInvokerForRemote() {
       //此处我们的urls集合中的url只有一个,有多个逻辑这里跳过不看
       //如果urls的长度为1,说明只有一个服务提供者,则直接通过protocolSPI.refer方法创建一个Invoker实例,
       //如果这个服务提供者不是注册中心,则使用StaticDirectory对这个Invoker进行包装。
       //StaticDirectory是Dubbo框架中的一个类,用于将一组Invoker封装成一个目录,以便消费者调用
        if (urls.size() == 1) {
            URL curUrl = urls.get(0);
            //这里根据urlAddress内部的protocol属性作为key,通过dubbo的SPI机制寻找对应协议的实现类 
            //这里实际调用的是DubboProtocol的refer方法,因为我们这里urlAddress的protocol值为dubbo
            invoker = protocolSPI.refer(interfaceClass, curUrl);
            //如果当前url并非指代一个注册中心地址
            if (!UrlUtils.isRegistry(curUrl)) {
                List<Invoker<?>> invokers = new ArrayList<>();
                invokers.add(invoker);
                //默认情况下Cluster会通过Registry拿到一堆服务提供方的IP地址列表后,然后通过一定的路由和负载均衡策略决定具体选择调用哪一个Provider      
                invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
            }
        } else {
           ...
        }
    }
  • dubbo的Adaptive动态适配机制会为ProtocolSPI类创建一个代理对象,代理对象的refer如下所示:(本系列还未讲到dubbo SPI原理部分,所以这部分大家先了解即可,感兴趣的也可以自行研究一下)
代码语言:javascript
复制
public class Protocol$Adaptive
        implements Protocol {
        
    public Invoker refer(Class clazz, URL uRL) throws RpcException {
        String string;
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();
        if (string == null) {
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + uRL + ") use keys([protocol])");
        }
        ScopeModel scopeModel = ScopeModelUtil.getOrDefault(uRL.getScopeModel(), Protocol.class);
        Protocol protocol = scopeModel.getExtensionLoader(Protocol.class).getExtension(string);
        return protocol.refer(clazz, uRL);
    }
    ...

DubboProtocol的refer方法实现如下:

代码语言:javascript
复制
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        ... 
        return protocolBindingRefer(type, url);
    }

    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        ... 
        // create rpc invoker.
        // dubbo总共分为十层,各个层之间的交互主要是通过Inovker完成的,可以理解分层的实现是Invoker套Invoker
        //这里只需要知道invoker的doInvoke方法中会完成本层应该做的逻辑
        //例如这里DubboInvoker会在protocol层完成相关逻辑处理
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        ...
        return invoker;
    }    

实现点点直连

通过上面的分析可知,dubbo为我们在客户端配置中提供了一个url参数用来实现点点直连,url的构成规则为:

  • [protocol://][username:password@][host:port]/[path][?k1=v1&k2=v2]
代码语言:javascript
复制
dubbo://127.0.0.1:80/dubbo.dubboSpi.HelloService?application=generic-call-consumer&async=true&background=false&generic=true&interface=dubbo.dubboSpi.HelloService&param=value&pid=4600&register.ip=192.168.18.131&side=consumer&sticky=false&timeout=7000

可见dubbo的url 的构成规则,居然和 http 的构成规则如出一辙,那我们试着通过赋值 url 为dubbo://[机器IP结点]:[机器IP提供Dubbo服务的端口],应该就大功告成了。

在这里插入图片描述
在这里插入图片描述

准备一个页面,填入 5 个字段信息,接口类名、接口方法名、接口方法参数类名、指定的 URL 节点、修复问题的 Java 代码,然后将这 5 个字段通过 HTTP 请求发往 Web 服务器,Web 服务器接收到请求后组装泛化所需对象,最后通过泛化调用的形式完成功能修复。


消费端
  • 负责接收动态调试请求的控制器
代码语言:javascript
复制
@RestController
public class DynamicDebugController {
    private static String zookeeperAddress = "zookeeper://" + System.getProperty("zookeeper.address", "127.0.0.1") + ":2181";

    @PostMapping("/gateway/dynamic/debug/request")
    public Object repairRequest(@RequestBody DynamicDebugRequest dynamicDebugRequest) {
        // 将入参的req转为下游方法的入参对象,并发起远程调用
        return commonInvoke(dynamicDebugRequest);
    }

    private Object commonInvoke(DynamicDebugRequest dynamicDebugRequest) {
        // 然后试图通过类信息对象想办法获取到该类对应的实例对象
        ReferenceConfig<GenericService> referenceConfig =
                createReferenceConfig(dynamicDebugRequest.getClassName(), dynamicDebugRequest.getUrl());

        // 远程调用
        GenericService genericService = referenceConfig.get();
        return genericService.$invoke(
                dynamicDebugRequest.getMtdName(),
                new String[]{dynamicDebugRequest.getParameterTypeName()},
                new Object[]{dynamicDebugRequest.getParamsMap()});
    }

    private static ReferenceConfig<GenericService> createReferenceConfig(String className, String url) {
        DubboBootstrap dubboBootstrap = DubboBootstrap.getInstance();
        // 设置应用服务名称
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName(dubboBootstrap.getApplicationModel().getApplicationName());
        // 设置注册中心的地址
        RegistryConfig registryConfig = new RegistryConfig(zookeeperAddress);
        ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
        referenceConfig.setApplication(applicationConfig);
        referenceConfig.setRegistry(registryConfig);
        referenceConfig.setInterface(className);
        // 设置泛化调用形式
        referenceConfig.setGeneric("true");
        // 设置默认超时时间5秒
        referenceConfig.setTimeout(5 * 1000);
        // 设置点对点连接的地址
        referenceConfig.setUrl(url);
        return referenceConfig;
    }
}
  • 承载动态调试请求参数的对象
代码语言:javascript
复制
@Setter
@Getter
public class DynamicDebugRequest {
    /**
     * <h2>接口类名,例:com.provider.one.DynamicDebugService</h2>
     **/
    private String className;
    /**
     * <h2>接口方法名,例:dynamicDebug</h2>
     **/
    private String mtdName;
    /**
     * <h2>接口方法参数类名,例:com.provider.one.DynamicRequest</h2>
     **/
    private String parameterTypeName;
    /**
     * <h2>指定的URL节点,例:dubbo://ip:port</h2>
     **/
    private String url;
    /**
     * <h2>可以是调用具体接口的请求参数,也可以是修复问题的Java代码</h2>
     **/
    private Map<String,String> paramsMap;
}

提供端
  • 服务提供者的启动类
代码语言:javascript
复制
public class Provider {
    private static String zookeeperAddress = "zookeeper://" + System.getProperty("zookeeper.address", "127.0.0.1") + ":2181";

    public static void main(String[] args) throws InterruptedException {
        //启动内嵌的zk
        new EmbeddedZooKeeper(2181, false).start();

        //创建ApplicationConfig
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName("dynamic-debug-service-provider");
        //创建注册中心配置
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress(zookeeperAddress);

        //新建服务实现类,注意要使用GenericService接收
        DynamicDebugService helloService = new DynamicDebugServiceImpl();

        //创建服务相关配置
        ServiceConfig<DynamicDebugService> service = new ServiceConfig<>();
        service.setApplication(applicationConfig);
        service.setRegistry(registryConfig);
        service.setInterface(DynamicDebugService.class);
        service.setRef(helloService);
        service.export();

        new CountDownLatch(1).await();
    }
}

官方文档Demo提供的EmbeddedZooKeeper类源码,大家copy到自己本地即可

  • 对外暴露的动态调试服务接口
代码语言:javascript
复制
public interface DynamicDebugService {
    /**
     * 定义了一个专门处理万能修复逻辑的Dubbo接口
     */
    Object dynamicDebug(Map<String,String> req);
}
  • 服务接口的实现类
代码语言:javascript
复制
public class DynamicDebugServiceImpl implements DynamicDebugService {
    private final GroovyClassLoader groovyClassLoader = new GroovyClassLoader();

    @SneakyThrows
    @Override
    public Object dynamicDebug(Map<String,String> req) {
        // 编译 Java 代码,然后变成 JVM 可识别的 Class 对象信息
        Class<?> javaClass = compile(req.get("code"));
        //和spring结合的扩展思路: 创建实例对象,并经过spring的后置处理
        // Object bean = instantiationAndPostProcessBean(javaClass);
        //这里没有和spring结合,直接简单实例化即可
        Object bean = javaClass.newInstance();
        if(!(bean instanceof Function)){
            throw new IllegalArgumentException("动态java类并非Function类型");
        }
        Function<Map<String,String>, Object> function = (Function) bean;
        // 执行单例对象的方法即可
        return function.apply(req);
    }


    /**
     * 利用 groovy-all.jar 中的 groovyClassLoader 来编译 Java 代码
     */
    private Class<?> compile(String javaCode) {
        return groovyClassLoader.parseClass(javaCode);
    }

    /**
     * 实例化bean,并经过spring的所有后置处理,但是不放入Spring容器中
     */
    private Object instantiationAndPostProcessBean(Class<?> javaClass) {
        return ((DefaultListableBeanFactory) SpringUtil.getBeanFactory()).createBean(javaClass);
    }
}

测试

测试,首先我们需要准备一个测试类:

代码语言:javascript
复制
public class TestJavaCode implements Function<Map<String,String>,String> {
    @Override
    public String apply(Map<String,String> s) {
        System.out.println("执行动态方法: "+s);
        return "res: "+s;
    }
}

该类会作为请求参数传递给动态调试控制器,然后由动态调试控制器通过泛化调用,来调用服务端的动态调试服务接口,最终执行测试的apply方法。

  • 请求参数和请求结果
代码语言:javascript
复制
{
    "className":"com.provider.one.DynamicDebugService",
    "mtdName":"dynamicDebug",
    "parameterTypeName":"java.util.Map",
    "url":"dubbo://192.168.154.1:20880/com.provider.one.DynamicDebugService",
    "paramsMap": {
        "code": "package com.provider.one.code;import java.util.function.Function;public class TestJavaCode implements Function<Map<String,String>,String> {@Override public String apply(Map<String,String> s) {System.out.println(\"执行动态方法: \"+s);return \"res: \"+s;}}"
    }
}
在这里插入图片描述
在这里插入图片描述

点点直连小结

哪些应用场景需要用到点点直连呢?

  • 第一,修复生产环境突然Bug事件,通过直连 + 泛化 + 动态代码编译执行,可以轻松临时解决产线棘手的问题。
  • 第二,绕过注册中心直接联调测试,有些公司由于测试环境的复杂性,有时候不得不采用简单的直连方式,来快速联调测试验证功能。
  • 第三,检查服务存活状态,如果需要针对多台机器进行存活检查,那就需要循环调用所有服务的存活检查接口。

点点直连实现简单来说,分为如下几步:

  • 接口类名、接口方法名、接口方法参数类名、业务请求参数,四个维度的数据不能少。
  • 根据接口类名创建 ReferenceConfig 对象,设置 generic = true 、url = 协议 +IP+PORT 两个重要属性,调用 referenceConfig.get 拿到 genericService 泛化对象。
  • 传入接口方法名、接口方法参数类名、业务请求参数,调用 genericService.$invoke 方法拿到响应对象。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-05-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Dubbo源码篇03---从点点直连探究Complier编译的原理
  • 实际需求
    • 如何实现动态编译?
      • 如何发起调用?
        • 点点直连原理
        • 实现点点直连
    • 点点直连小结
    相关产品与服务
    微服务引擎 TSE
    微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档