前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【DUBBO】 服务引用RegistryDirectory

【DUBBO】 服务引用RegistryDirectory

作者头像
spilledyear
发布2018-12-24 10:18:27
8130
发布2018-12-24 10:18:27
举报
文章被收录于专栏:小白鼠

“阉割版"

服务引用主要涉及到ReferenceBean

代码语言:javascript
复制
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
    ......
    @Override
    public Object getObject() throws Exception {
        return get();
    }
}

实现了FactoryBean接口,所以在获取实例的时候,实际上是调用getObject方法返回实例。这里面的get方法继承自ReferenceConfig类

代码语言:javascript
复制
public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}

init主要用于构建ref,所以,核心逻辑就在init方法中。

代码语言:javascript
复制
private void init() {
    ......
    Map<String, String> map = new HashMap<String, String>();
    Map<Object, Object> attributes = new HashMap<Object, Object>();
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
    map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
    ......
    //attributes are stored by system context.
    StaticContext.getSystemContext().putAll(attributes);
    ref = createProxy(map);
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

init方法中的代码很长,很大一部分都是在做检查和将一些属性封装到map中,所以这一部分直接略过,看核心的createProxy(map)方法。createProxy方法表面上看起来只是创建一个代理对象,其实里面的逻辑很多,虽然最终确实是只返回了一个代理对象,但还涉及到Invoker的处理等相关操作。

代码语言:javascript
复制
private T createProxy(Map<String, String> map) {
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    final boolean isJvmRefer;
    if (isInjvm() == null) {
        // url 配置被指定,则不做本地引用
        if (url != null && url.length() > 0) {
            isJvmRefer = false;
        } else {
            // by default, reference local service if there is
            // // 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
            isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
        }
    } else {
        // 获取 injvm 配置值
        isJvmRefer = isInjvm();
    }

    // 本地引用
    if (isJvmRefer) {
        URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
        invoker = refprotocol.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
    // 远程引用
    } else {
        // url 不为空,表明用户可能想进行点对点调用
        // 直联消费就是在消费者中直接配置url,不经过注册中心,
        // 例如  <dubbo:reference id="directService" url="dubbo://localhost:20882" interface="dubbo.direct.provider.DirectService" />
        if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (url.getPath() == null || url.getPath().length() == 0) {
                        url = url.setPath(interfaceName);
                    }
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else { // assemble URL from register center's configuration
            // 通过注册中心调用
            List<URL> us = loadRegistries(false);
            if (us != null && !us.isEmpty()) {
                for (URL u : us) {
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }
            if (urls.isEmpty()) {
                throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
            }
        }

        // 单个注册中心或服务提供者,服务直联
        if (urls.size() == 1) {
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            // 多个注册中心或多个服务提供者,或者两者混合
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // use AvailableCluster only when register's cluster is available
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    return (T) proxyFactory.getProxy(invoker);
}

我本地的消费者通过注册中心进行消费,所以会通过以下语句创建invoker

代码语言:javascript
复制
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

invoker = refprotocol.refer(interfaceClass, urls.get(0));

在前一节服务暴露的时候已经提到过,此处的refprotocol就是动态生成的Protocol

Adaptive,所以这里会调用Protocol
Adaptive,所以这里会调用Protocol

Adaptive的refer方法

代码语言:javascript
复制
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
    if (arg1 == null) throw new IllegalArgumentException("url == null");
    org.apache.dubbo.common.URL url = arg1;
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
        throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.rpc.Protocol extension = null;
    try {
        extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    } catch (Exception e) {
        if (count.incrementAndGet() == 1) {
            logger.warn("Failed to find extension named " + extName + " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.", e);
        }
        extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension("dubbo");
    }
    return extension.refer(arg0, arg1);
}

注册中心这一类URL是以regist开头,所以这里会调用RegistryProtocol的refer方法

代码语言:javascript
复制
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    return doRefer(cluster, registry, type, url);
}

此时的url地址如下

代码语言:javascript
复制
zookeeper://localhost:2181/org.apache.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=22552&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D22552%26qos.port%3D33333%26register.ip%3D10.10.128.134%26side%3Dconsumer%26timestamp%3D1544269297719&timestamp=1544269298561

同理,这里会调用RegistryFactory$Adaptive的getRegistry方法

代码语言:javascript
复制
public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) {
    if (arg0 == null) throw new IllegalArgumentException("url == null");
    org.apache.dubbo.common.URL url = arg0;
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
        throw new IllegalStateException("Fail to get extension(org.apache.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.registry.RegistryFactory extension = null;
    try {
        extension = (org.apache.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);
    } catch (Exception e) {
        if (count.incrementAndGet() == 1) {
            logger.warn("Failed to find extension named " + extName + " for type org.apache.dubbo.registry.RegistryFactory, will use default extension dubbo instead.", e);
        }
        extension = (org.apache.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension("dubbo");
    }
    return extension.getRegistry(arg0);
}

所以接下来会调用ZookeeperRegistryFactory的getRegistry方法,这个和服务暴露的过程是一样的,就不过多说明了。

doRefer

在RegistryProtocol中返回Registry registry之后,接下来会判断是否有group属性如果有则进行分组,然后调用RegistryProtocol中的doRefer方法。

代码语言:javascript
复制
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        // 向zk中的consumers节点下写入的URL地址
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    // 订阅服务地址列表
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    // 生成invoker,其中缓存了RegistryDirectory(包括服务的所有地址,路由协议之类的)
    return invoker;
}

RegistryDirectory

通过RegistryDirectory里面的刷新方法refreshInvoker最终会调用DubboProtocol的refer方法,不过在调用refer方法知之前,会先经过两个Wrapper类,主要是和过滤器和拦截器相关。

代码语言:javascript
复制
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

上述方法调用链路如下

代码语言:javascript
复制
getClients(url)==>
ExchangeClient connect==>
HeaderExchanger.connect==>
Transporters.connect==>
NettyTransporter.connect==>
NettyClient.doOpen

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    bootstrap = new ClientBootstrap(channelFactory);

    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("connectTimeoutMillis", getTimeout());
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
}

可以看到经过上述一系列调用,最终又是通过Netty与提供服务的Provider建立一个长连接,之前Provider也是通过netty监听20880端口从而监听客户端请求。这里如果同一个服务有多个不同的Provider,就和所有的Provider通过netty建立了连接。这样就完成了从URL到Invoker的转化,并缓存到urlInvokerMap这个Map数据结构中,其中key是URL,value是转换成的invoker。

getProxy

上面已经拿到了最终消费方对应的invoker对象,接下来就是在ReferenceConfig中为这个invoker创建一个代理对象

代码语言:javascript
复制
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

return (T) proxyFactory.getProxy(invoker);

同理,这里会调用ProxyFactory$Adaptive的getProxy方法

代码语言:javascript
复制
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
    if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");org.apache.dubbo.common.URL url = arg0.getUrl();
    String extName = url.getParameter("proxy", "javassist");
    if(extName == null) throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
    org.apache.dubbo.rpc.ProxyFactory extension = null;
    try {
        extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
    }catch(Exception e){
        if (count.incrementAndGet() == 1) {
            logger.warn("Failed to find extension named " + extName + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", e);
        }
        extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
    }
    return extension.getProxy(arg0, arg1);
}

在没有指定的情况下,默认会调用的JavassistProxyFactory的getProxy方法

代码语言:javascript
复制
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

InvokerInvocationHandler

生成的代理类如下

代码语言:javascript
复制
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService{
  public static Method[] methods;
  private InvocationHandler handler;

  public String sayHello(String paramString){
    Object[] arrayOfObject = new Object[1];
    arrayOfObject[0] = paramString;
    Object localObject = this.handler.invoke(this, methods[0], arrayOfObject);
    return ((String)localObject);
  }

  public Object $echo(Object paramObject)
  {
    Object[] arrayOfObject = new Object[1];
    arrayOfObject[0] = paramObject;
    Object localObject = this.handler.invoke(this, methods[1], arrayOfObject);
    return ((Object)localObject);
  }

  public proxy0(){
  }

  public proxy0(InvocationHandler paramInvocationHandler){
    this.handler = paramInvocationHandler;
  }
}

对应的InvokerInvocationHandler

代码语言:javascript
复制
public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }

        RpcInvocation invocation;
        if (RpcUtils.hasGeneratedFuture(method)) {
            Class<?> clazz = method.getDeclaringClass();
            String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length());
            Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes());
            invocation = new RpcInvocation(syncMethod, args);
            invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true");
            invocation.setAttachment(Constants.ASYNC_KEY, "true");
        } else {
            invocation = new RpcInvocation(method, args);
            if (RpcUtils.hasFutureReturnType(method)) {
                invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
                invocation.setAttachment(Constants.ASYNC_KEY, "true");
            }
        }
        return invoker.invoke(invocation).recreate();
    }
}

可以发现,当消费者调用提供者的方法时,最终在代理类里面还是通过之前生成的Invoker调用提供者方法。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.12.08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • doRefer
  • RegistryDirectory
    • InvokerInvocationHandler
    相关产品与服务
    微服务引擎 TSE
    微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档