前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo源码学习-服务引用(服务启动阶段)

Dubbo源码学习-服务引用(服务启动阶段)

作者头像
炳臣
发布2020-03-20 11:32:21
1K0
发布2020-03-20 11:32:21
举报
文章被收录于专栏:一块自留地一块自留地

一、服务消费端应该做哪些事?

  • 生成代理对象(帮我们实现通信细节)
  • 建立通信连接(netty)
  • 从zk获取服务提供者地址(订阅提供者)
  • 负载均衡
  • 容错
  • 序列化
  • ...

二、两个步骤

上面的逻辑可以大致分为两个步骤

  • 服务启动阶段 构建通信连接,创建代理对象
  • 远程调用阶段 远程调用服务提供者方法

三、服务启动阶段

Dubbo 服务引用的时机有两个:

  • 第一个是在 Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务
  • 第二个是在 ReferenceBean对应的服务被注入到其他类中时引用

这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。 默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 dubbo:reference 的 init 属性开启。

通常我们在代码中通过如下方式使用dubbo服务:

代码语言:javascript
复制
@Reference
private ISayHelloService iSayHelloService;

ISayHelloService.hello();

可以通过注解@Reference或者配置文件dubbo-consumer.xml的方式,我这里用的是注解。

我们先来debug看一下,这个ISayHelloService对象是个什么东西。

可以看到,ISayHelloService是个代理对象,并且是个包装类,里面包了很多层。

ReferenceAnnotationBeanPostProcessor -> ReferenceBean -> InvokerHandler -> mockClusterInvoker -> RegistryDirectory

熟悉spring的同学应该知道,BeanPostProcessor这样的类会在spring启动时执行,所以我们从这个类入手。

代码语言:javascript
复制
//ReferenceAnnotationBeanPostProcessor.java

private static class ReferenceBeanInvocationHandler implements InvocationHandler {

        private final ReferenceBean referenceBean;

        private Object bean;

        private ReferenceBeanInvocationHandler(ReferenceBean referenceBean) {
            this.referenceBean = referenceBean;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            return method.invoke(bean, args);
        }
        //初始化方法
        private void init() {
            this.bean = referenceBean.get();
        }
    }
复制代码
ReferenceBean.get()

继续追踪 referenceBean.get():

代码语言:javascript
复制
//ReferenceConfig.java

//获取服务引用
public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        // 检测 ref 是否为空,为空则通过 init 方法创建
        if (ref == null) {
            // init 方法主要用于处理配置,以及调用 createProxy 生成代理类
            init();
        }
        return ref;
    }
复制代码

继续追踪 init():

代码语言:javascript
复制
//ReferenceConfig.java

private void init() {
        if (initialized) {
            return;
        }
        initialized = true;
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
        }
        // get consumer's global configuration
        checkDefault();
        //填充ConsumerConfig
        appendProperties(this);
        //...省略
        
        //组装URL
        Map<String, String> map = new HashMap<String, String>();
        Map<Object, Object> attributes = new HashMap<Object, Object>();
        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        //...省略
        
        //获取注册中心host
        String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry == null || hostToRegistry.length() == 0) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

        //attributes are stored by system context.
        StaticContext.getSystemContext().putAll(attributes);
        //重点!! 创建代理对象
        ref = createProxy(map);
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }
复制代码

这里的代码非常多,我省略了一些,我们主要看大致流程。 首先是通过系统变量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段。 然后收集各种配置,并将配置存储到 map,用以组装URL。 最后创建代理对象。

createProxy()

我们重点来看创建代理对象的方法:

代码语言:javascript
复制
//ReferenceConfig.java

private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        //...省略
        
        //本地引用
        if (isJvmRefer) {
           //...省略
        } else {
            //远程引用-直连
            if (url != null && url.length() > 0) { 
                //...省略
            } else { //远程引用-走注册中心
            
                // 加载注册中心 url
                List<URL> us = loadRegistries(false);
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                       
                       // 添加 refer 参数到 url 中,并将 url 添加到 urls 中 
                       urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }
            // 单个注册中心或服务提供者(服务直连,下同)
            if (urls.size() == 1) {
                //自适应 -> wrapper(filter(RegisterProtocol)).refer
                // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                //...省略
            }
        }

        //生成代理类
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }
复制代码

这里的逻辑比较简单,对服务调用的方式进行逻辑处理,我们直接看最重要的refprotocol.refer(interfaceClass, urls.get(0))方法。

这个方法的入参urls.get(0)是注册中心的url,解析出来应该是类似于registry://registry-host/org.apache.dubbo.registry.RegistryService?refer=URL.encode("consumer://consumer-host/com.foo.FooService?version=1.0.0")

基于扩展点自适应机制,通过 URL 的 registry:// 协议头识别,就会调用 RegistryProtocol.refer() 方法,基于 refer 参数中的条件,查询提供者 URL,如: dubbo://service-host/com.foo.FooService?version=1.0.0

然后通过提供者 URL 的 dubbo:// 协议头识别,就会调用 DubboProtocol.refer() 方法,得到提供者引用。

RegistryProtocol.refer()

所以我们接着看refer()方法:

代码语言:javascript
复制
//RegistryProtocol.java

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //获取注册中心对象
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }
        //...省略
        
        // 调用 doRefer 继续执行服务引用逻辑
        return doRefer(cluster, registry, type, url);
    }
复制代码

继续追踪 doRefer()

代码语言:javascript
复制
//RegistryProtocol.java

/**
     *
     * @param cluster
     * @param registry 注册中心对象
     * @param type 
     * @param url   注册中心url
     * @param <T>
     * @return
     */
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //获得 RegistryDirectory 对象,即服务目录
        //服务目录类似于注册中心,管理生产者ip、port等,实际上是一个invoker集合
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry); //设置注册中心
        directory.setProtocol(protocol); //设置服务提供者
        // 创建订阅 URL 即 消费者url
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        // 向注册中心注册自己(服务消费者)
        //把自己写入zk中的 /dubbo/com.foo.BarService/consumers 目录
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        // 订阅 providers、configurators、routers 等节点数据
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
        //MockClusterWrapper(FailoverCluster)
        Invoker invoker = cluster.join(directory);
        // 向本地注册表,注册消费者
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }
复制代码

这里主要做了以下事情:

  • 连接注册中心
  • 向注册中心注册自己(服务消费者)
  • 订阅注册中心providers等节点
subscribe()

我们看最重要的directory.subscribe()方法,即订阅:

代码语言:javascript
复制
//ZookeeperRegistry.java

/**
     *
     * @param url 消费者url consumer://
     * @param listener  RegistryDirectory
     */
    @Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // interface = *,即订阅全局
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                //...省略
            } else {
                // interface = 特定接口,只有这个接口的子节点改变时,才触发回调
                // 如:interface = com.lol.test.SayFacade
                // Service 层下的所有 URL
                List<URL> urls = new ArrayList<URL>();
                //path是zk中生产者的目录:path -> /dubbo/com.foo.BarService/providers
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    // 获得 ChildListener 对象
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                // 服务提供者url变更时,调用 `#notify(...)` 方法,通过监听器回调
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    //children 是 子节点 -> 服务提供者
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        //urls 是真正的服务提供者url
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //第一次订阅,全量通知,创建对应invoker
                notify(url, listener, urls);
            }
        } 
    }
复制代码

这里的大致逻辑就是:

  • 通过zk获取对应service的服务提供者urls
  • 然后通过RegistryDirectory对这些urls进行监听,如果有变动,则调用notify()方法
  • 第一次则对所有urls调用notify()方法
notify()

接下来我们重点看notify():

代码语言:javascript
复制
//AbstractRegistry.java

/**
     *
     * @param url 服务消费者url consumer://
     * @param listener RegistryDirectory
     * @param urls 服务提供者urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        //...省略
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            //categoryList 服务提供者url
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            saveProperties(url);
            listener.notify(categoryList);
        }
    }
复制代码

继续追踪

代码语言:javascript
复制
//RegistryDirectory.java

/**
     * 接收服务变更通知
     * @param urls 服务提供者url集合
     */
    @Override
    public synchronized void notify(List<URL> urls) {
        // 定义三个集合,分别用于存放服务提供者 url,路由 url,配置器 url
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            // 获取 category 参数
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            // 根据 category 参数将 url 分别放到不同的列表中
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                // 添加服务提供者 url
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        // 将 url 转成 Configurator
        if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // 将 url 转成 Router
        if (routerUrls != null && !routerUrls.isEmpty()) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && !localConfigurators.isEmpty()) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        //重点!! 刷新 Invoker 列表
        refreshInvoker(invokerUrls);
    }
复制代码

这里我们只需要看最后一行代码refreshInvoker(invokerUrls),它的作用是刷新invoker。 该方法是保证RegistryDirectory中的服务提供者集合methodInvokerMap随注册中心变化而变化的关键。

refreshInvoker()
代码语言:javascript
复制
//RegistryDirectory.java

 /**
     * 
     * @param invokerUrls 服务提供者url
     */
    private void refreshInvoker(List<URL> invokerUrls) {
        // 如果invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            // 设置 forbidden 为 true
            this.forbidden = true; 
            this.methodInvokerMap = null; 
            // 销毁所有 Invoker
            destroyAllInvokers(); 
        } else {
            this.forbidden = false; 
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; 
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                // 添加缓存 url 到 invokerUrls 中
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                // 缓存 invokerUrls
                this.cachedInvokerUrls.addAll(invokerUrls);
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            // 将 url 转成 Invoker
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
            // 将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); 
            // state change
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            // 合并多个组的 Invoker
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                // 销毁无用 Invoker
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }
复制代码

这里的大致逻辑为:

  • 根据协议头来判断是否禁用服务
  • 将url转为invoker
  • 销毁无用的 Invoker

首先当url 协议头为empty://,此时表示禁用所有服务,会销毁所有Invoker。 然后把将url转为invoker,得到 <url, Invoker> 的映射关系。然后进一步进行转换,得到 <methodName, Invoker 列表> 映射关系。 之后进行多组 Invoker 合并操作,并将合并结果赋值给 methodInvokerMap。 最后销毁无用的 Invoker,避免服务消费者调用已下线的服务的服务

Invoker 是 Dubbo 的核心模型,代表一个可执行体。在服务提供方,Invoker 用于调用服务提供类。在服务消费方,Invoker 用于执行远程调用。

此时的重点就是url转为invoker的过程了,因为dubbo远程调用是通过invoker来进行的,所以在转变的过程中肯定有很多重要的内容。

toInvokers()

我们接着看toInvokers(invokerUrls)

代码语言:javascript
复制
//RegistryDirectory.java

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
    if (urls == null || urls.isEmpty()) {
        return newUrlInvokerMap;
    }
    Set<String> keys = new HashSet<String>();
    // 获取服务消费端配置的协议
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        if (queryProtocols != null && queryProtocols.length() > 0) {
            boolean accept = false;
            String[] acceptProtocols = queryProtocols.split(",");
            // 检测服务提供者协议是否被服务消费者所支持
            for (String acceptProtocol : acceptProtocols) {
                if (providerUrl.getProtocol().equals(acceptProtocol)) {
                    accept = true;
                    break;
                }
            }
            if (!accept) {
                // 若服务消费者协议头不被消费者所支持,则忽略当前 providerUrl
                continue;
            }
        }
        // 忽略 empty 协议
        if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
            continue;
        }
        // 通过 SPI 检测服务端协议是否被消费端支持,不支持则抛出异常
        if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
            logger.error(new IllegalStateException("Unsupported protocol..."));
            continue;
        }
        
        // 合并 url
        URL url = mergeUrl(providerUrl);

        String key = url.toFullString();
        if (keys.contains(key)) {
            // 忽略重复 url
            continue;
        }
        keys.add(key);
        // 将本地 Invoker 缓存赋值给 localUrlInvokerMap
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
        // 获取与 url 对应的 Invoker
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        // 缓存未命中
        if (invoker == null) {
            try {
                boolean enabled = true;
                if (url.hasParameter(Constants.DISABLED_KEY)) {
                    // 获取 disable 配置,取反,然后赋值给 enable 变量
                    enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                } else {
                    // 获取 enable 配置,并赋值给 enable 变量
                    enabled = url.getParameter(Constants.ENABLED_KEY, true);
                }
                if (enabled) {
                    // 调用 refer 获取 Invoker
                    invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface...");
            }
            if (invoker != null) {
                // 缓存 Invoker 实例
                newUrlInvokerMap.put(key, invoker);
            }
            
        // 缓存命中
        } else {
            // 将 invoker 存储到 newUrlInvokerMap 中
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}

这里的逻辑很简单:

  • 首先会对服务提供者 url 进行检测,若服务消费端的配置不支持服务端的协议,或服务端 url 协议头为 empty 时,toInvokers 均会忽略服务提供方 url。
  • 合并 url,然后访问缓存,尝试获取与 url 对应的 invoker。
  • 如果缓存命中,直接将 Invoker 存入 newUrlInvokerMap 中。
  • 如果未命中,则需新建 Invoker。
DubboProtocol.refer()

invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl); 这段代码就是创建invoker的代码,这里通过自适应拿到的是DubboProtocol, 所以我们深入看一下DubboProtocol.refer():

代码语言:javascript
复制
//DubboProtocol.java

    //客户端实例
    private final ExchangeClient[] clients;

    private final AtomicPositiveInteger index = new AtomicPositiveInteger();

    private final String version;

    private final ReentrantLock destroyLock = new ReentrantLock();

    private final Set<Invoker<?>> invokers;

/**
  * url 服务提供者url
  */
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // 创建 DubboInvoker
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}
复制代码

这里的代码很简单,只是new了一个Invoker。 重点是对DubboInvoker的属性进行填充。 我们先来看下ExchangeClient[] clients属性,它是客户端实例集合,通过getClients(url)方法获取。

ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如 NettyClient、MinaClient 等,默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑。

getClients()
代码语言:javascript
复制
//DubboProtocol.java

private ExchangeClient[] getClients(URL url) {
    // 是否共享连接
    boolean service_share_connect = false;
  	// 获取连接数,默认为0,表示未配置
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // 如果未配置 connections,则共享连接
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            // 获取共享客户端
            clients[i] = getSharedClient(url);
        } else {
            // 初始化新的客户端
            clients[i] = initClient(url);
        }
    }
    return clients;
}

这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,默认情况下,使用共享客户端实例。getSharedClient 方法中也会调用 initClient 方法,因此下面我们一起看一下这两个方法。

代码语言:javascript
复制
//DubboProtocol.java

private ExchangeClient getSharedClient(URL url) {
    String key = url.getAddress();
    // 获取带有“引用计数”功能的 ExchangeClient
    ReferenceCountExchangeClient client = referenceClientMap.get(key);
    if (client != null) {
        if (!client.isClosed()) {
            // 增加引用计数
            client.incrementAndGetCount();
            return client;
        } else {
            referenceClientMap.remove(key);
        }
    }

    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        if (referenceClientMap.containsKey(key)) {
            return referenceClientMap.get(key);
        }

        // 创建 ExchangeClient 客户端
        ExchangeClient exchangeClient = initClient(url);
        // 将 ExchangeClient 实例传给 ReferenceCountExchangeClient,这里使用了装饰模式
        client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
        referenceClientMap.put(key, client);
        ghostClientMap.remove(key);
        locks.remove(key);
        return client;
    }
}
复制代码

这里先尝试获取有引用计数功能的共享实例,如果获取不到,则通过initClient(url)新创建一个。

initClient()
代码语言:javascript
复制
//DubboProtocol.java

private ExchangeClient initClient(URL url) {

    // 获取客户端类型,默认为 netty4
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    // 添加编解码和心跳包参数到 url 中
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // 检测客户端类型是否存在,不存在则抛出异常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: ...");
    }

    ExchangeClient client;
    try {
        // 获取 lazy 配置,并根据配置值决定创建的客户端类型
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            // 创建懒加载 ExchangeClient 实例
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 创建普通 ExchangeClient 实例
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service...");
    }
    return client;
}
复制代码

initClient 方法首先获取用户配置的客户端类型,默认为 netty4。然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。

接下来看创建client的方法Exchangers.connect(url, requestHandler)

代码语言:javascript
复制
//Exchangers.java

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 获取 Exchanger 实例,默认为 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);
}
复制代码

这里通过自适应,调用HeaderExchanger.connect()

代码语言:javascript
复制
//HeaderExchanger.java

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 这里包含了多个调用,分别如下:
    // 1. 创建 HeaderExchangeHandler 对象
    // 2. 创建 DecodeHandler 对象
    // 3. 通过 Transporters 构建 Client 实例
    // 4. 创建 HeaderExchangeClient 对象
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
复制代码

这里的调用比较多,我们这里重点看一下 Transporters 的 connect 方法。如下:

代码语言:javascript
复制
//Transporters.java

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    
    // 获取 Transporter 自适应拓展类,并调用 connect 方法生成 Client 实例
    return getTransporter().connect(url, handler);
}
复制代码

getTransporter() 方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。如下:

connect()
代码语言:javascript
复制
//NettyTransporter.java

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    // 创建 NettyClient 对象
    return new NettyClient(url, listener);
}
复制代码

到这里就不继续跟下去了,在往下就是通过 Netty 提供的 API 构建 Netty 客户端了,大家有兴趣可以自己看看。

createProxy()

到这里,服务提供者的Invoker已经创建好了,接下来就是创建代理对象。 也就是ReferenceConfig.createProxy()方法,这里就不多赘述了。

总结

代理对象创建完毕后,dubbo服务引用的服务启动阶段已经完成了,再来回顾下我们都做了哪些事:

  • 连接注册中心
  • 向注册中心注册自己(服务消费者)
  • 订阅注册中心服务提供者,并获取服务提供者url
  • 创建Invoker
  • 建立通信,连接netty
  • 创建代理对象

服务启动之后,接下来就是远程调用阶段,我们将在下篇文章中详细分析。

最后,简单的画个流程图,方便理解。

参考: Dubbo官网

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020年03月12日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 二、两个步骤
  • 三、服务启动阶段
    • ReferenceBean.get()
      • createProxy()
      • RegistryProtocol.refer()
      • notify()
      • refreshInvoker()
      • toInvokers()
      • DubboProtocol.refer()
      • getClients()
      • initClient()
      • connect()
      • createProxy()
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档