前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo系列二之Consumer启动

Dubbo系列二之Consumer启动

原创
作者头像
用户9511949
发布2024-07-18 19:59:00
910
发布2024-07-18 19:59:00

1 @EnableDubbo

先看下Dubbo源码中demo启动类

代码语言:javascript
复制
@SpringBootApplication
@EnableDubbo(scanBasePackages = {"org.apache.dubbo.springboot.demo.provider"})
public class ProviderApplication {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(ProviderApplication.class, args);
        System.out.println("dubbo service started");
        new CountDownLatch(1).await();
    }
}

通过@EnableDubbo注解启动,该注解整合了@EnableDubboConfig和@DubboCompoentScan,这两个注解分别import了DubboConfigConfigurationRegistrar和DubboComponentScanRegistrar

代码语言:javascript
复制
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Import(DubboConfigConfigurationRegistrar.class)
public @interface EnableDubboConfig {......}

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {......}

DubboConfigConfigurationRegistrar和DubboComponentScanRegistrar都是在BeanDefinitionRegistry加载BeanDefinition的过程中向BeanDefinitionRegistry注册自定义的BeanDefinition,代码如下,都调用了DubboSpringInitializer.initialize方法初始化DubboSpringInitContext(如果存在就不再初始化)

代码语言:javascript
复制
public class DubboConfigConfigurationRegistrar implements ImportBeanDefinitionRegistrar {
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        // initialize dubbo beans
        DubboSpringInitializer.initialize(registry);BeanDefinitionRegistry
    }
}

public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar {
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        // initialize dubbo beans
        DubboSpringInitializer.initialize(registry);
        Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
        registerServiceAnnotationPostProcessor(packagesToScan, registry);
    }
}

DubboSpringInitializer.initialize最终调用DubboBeanUtils.registerCommonBeans注册一系列Dubbo的基础服务类,如下

代码语言:javascript
复制
static void registerCommonBeans(BeanDefinitionRegistry registry) {

    registerInfrastructureBean(registry, ServicePackagesHolder.BEAN_NAME, ServicePackagesHolder.class);

    registerInfrastructureBean(registry, ReferenceBeanManager.BEAN_NAME, ReferenceBeanManager.class);

    // Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean
    registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
        ReferenceAnnotationBeanPostProcessor.class);

    // TODO Whether DubboConfigAliasPostProcessor can be removed ?
    // Since 2.7.4 [Feature] https://github.com/apache/dubbo/issues/5093
    registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME,
        DubboConfigAliasPostProcessor.class);

    // register ApplicationListeners
    registerInfrastructureBean(registry, DubboDeployApplicationListener.class.getName(), DubboDeployApplicationListener.class);
    registerInfrastructureBean(registry, DubboConfigApplicationListener.class.getName(), DubboConfigApplicationListener.class);

    // Since 2.7.6 Register DubboConfigDefaultPropertyValueBeanPostProcessor as an infrastructure Bean
    registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME,
        DubboConfigDefaultPropertyValueBeanPostProcessor.class);

    // Dubbo config initializer
    registerInfrastructureBean(registry, DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);

    // register infra bean if not exists later
    registerInfrastructureBean(registry, DubboInfraBeanRegisterPostProcessor.BEAN_NAME, DubboInfraBeanRegisterPostProcessor.class);
}

这里主要先提下这两个类,后面详细分析

ReferenceAnnotationBeanPostProcessor:处理服务消费者的Processor

DubboDeployApplicationListener:Provider服务发布和Consumer引用的Listener

上面的所有内容在之前的文章中已经提到过,这样再说明一下,以便知道入口在哪里。

2 ReferenceAnnotationBeanPostProcessor

ReferenceAnnotationBeanPostProcessor实现了BeanFactoryPostProcessor接口,在BeanDefinition加载完了之后会调用其postProcessBeanFactory方法完成ReferenceBean的BeanDefinition注册,代码如下

代码语言:javascript
复制
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    String[] beanNames = beanFactory.getBeanDefinitionNames();
    for (String beanName : beanNames) {
        Class<?> beanType;
        if (beanFactory.isFactoryBean(beanName)) {
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
            if (isReferenceBean(beanDefinition)) {
                continue;
            }
            if (isAnnotatedReferenceBean(beanDefinition)) {
                // 处理@DubboReference和@Bean一起使用的场景,如下
                // @Bean
                // @DubboReference(scope = "remote", version = "1.0.0", group = "group1")
                // public ReferenceBean<DemoService> demoService() {
                //     return new ReferenceBean<>();
                // }
                // 这种情况已经注册了该ReferenceBean,所以processReferenceAnnotatedBeanDefinition就
                // 不需要再重复注册,只需要做一些其他的信息设置的工作
                processReferenceAnnotatedBeanDefinition(beanName, (AnnotatedBeanDefinition) beanDefinition);
                continue;
            }
            String beanClassName = beanDefinition.getBeanClassName();
            beanType = ClassUtils.resolveClass(beanClassName, getClassLoader());
        } else {
            beanType = beanFactory.getType(beanName);
        }
        if (beanType != null) {
            // 下面处理通过字段注入的场景,如下
            //    @DubboReference(scope = "remote", version = "1.0.0", group = "group1")
            //    private DemoService demoService;
            // 这种场景需要找到带有@DubboReference注解的字段信息,在prepareInjection方法中注册一个
            // ReferenceBean   
            AnnotatedInjectionMetadata metadata = findInjectionMetadata(beanName, beanType, null);
            try {
                prepareInjection(metadata);
            } catch (BeansException e) {
                throw e;
            } catch (Exception e) {
                throw new IllegalStateException("Prepare dubbo reference injection element failed", e);
            }
        }
    }

    ......

    try {
        // 发布一个DubboConfigInitEvent事件
        applicationContext.publishEvent(new DubboConfigInitEvent(applicationContext));
    } catch (Exception e) {
        logger.warn(CONFIG_DUBBO_BEAN_INITIALIZER, "", "", "publish early application event failed, please upgrade spring version to 4.2.x or later: " + e);
    }
}

注册完成了之后发布一个DubboConfigInitEvent事件,看看这个事件对应的Listener的逻辑

代码语言:javascript
复制
public class DubboConfigApplicationListener implements ApplicationListener<DubboConfigInitEvent>, ApplicationContextAware {

    private ApplicationContext applicationContext;

    private ModuleModel moduleModel;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
        this.moduleModel = DubboBeanUtils.getModuleModel(applicationContext);
    }
    @Override
    public void onApplicationEvent(DubboConfigInitEvent event) {
        if (nullSafeEquals(applicationContext, event.getSource())) {
            // It's expected to be notified at org.springframework.context.support.AbstractApplicationContext.registerListeners(),
            // before loading non-lazy singleton beans. At this moment, all BeanFactoryPostProcessor have been processed,
            if (initialized.compareAndSet(false, true)) {
                initDubboConfigBeans();
            }
        }
    }
    private void initDubboConfigBeans() {
        // load DubboConfigBeanInitializer to init config beans
        if (applicationContext.containsBean(DubboConfigBeanInitializer.BEAN_NAME)) {
            applicationContext.getBean(DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);
        } else {
            logger.warn(CONFIG_DUBBO_BEAN_NOT_FOUND, "", "", "Bean '" + DubboConfigBeanInitializer.BEAN_NAME + "' was not found");
        }
        // All infrastructure config beans are loaded, initialize dubbo here
        moduleModel.getDeployer().prepare();
    }
}

最终是通过DubboConfigBeanInitializer来做初始化,DubboConfigBeanInitializer实现了InitializingBean接口,afterPropertiesSet方法中的Init方法最终调用initReferenceBean将referenceBean添加到ConfigManager中

代码语言:javascript
复制
private synchronized void initReferenceBean(ReferenceBean referenceBean) throws Exception {
    ......
    // 获取referenceKey 
    String referenceKey = getReferenceKeyByBeanName(referenceBean.getId());
    if (StringUtils.isEmpty(referenceKey)) {
        referenceKey = ReferenceBeanSupport.generateReferenceKey(referenceBean, applicationContext);
    }
    ......
    ReferenceConfig referenceConfig = referenceConfigMap.get(referenceKey);
    if (referenceConfig == null) {
        
        // 放入缓存
        referenceConfigMap.put(referenceKey, referenceConfig);

        // 和provider一样,将referenceBean添加到ConfigManager中
        moduleModel.getConfigManager().addReference(referenceConfig);
    }
    ......
}

3 DubboDeployApplicationListener

DubboDeployApplicationListener主要用来处理Provider的服务发布和消费端的服务引用(上面提到过),其实现了ApplicationListener<ApplicationContextEvent>接口,主要监听ApplicationContext相关的各种事件,比如Spring启动完成后会发布一个ContextRefreshedEvent事件

代码语言:javascript
复制
public void onApplicationEvent(ApplicationContextEvent event) {
    if (nullSafeEquals(applicationContext, event.getSource())) {
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }
}

同样DubboDeployApplicationListener也实现了ApplicationContextAware接口,实例化后通过setApplicationContext方法将ModuleModel设置到自己的属性当中用户后续服务发布

代码语言:javascript
复制
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    ......
    this.moduleModel = DubboBeanUtils.getModuleModel(applicationContext);
    ......
}

DubboDeployApplicationListener监听到ContextRefreshedEvent事件之后,调用onContextRefreshedEvent方法进行服务发布

代码语言:javascript
复制
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
    // 通过ModuleModel获取发布器
    ModuleDeployer deployer = moduleModel.getDeployer();
    Assert.notNull(deployer, "Module deployer is null");
    // 开始服务发布的流程
    Future future = deployer.start();
    ......
}

最终调用DefaultModuleDeployer.startSync方法处理

代码语言:javascript
复制
private synchronized Future startSync() throws IllegalStateException {
    ......
    try {
        // new了一个CompletableFuture来以获取start的结果,并且发布一个start事件
        onModuleStarting();
        ......
        // 发布服务
        exportServices();

        // 服务引用
        referServices();
        ......

        // 下面都是根据不同的发布结果,发布不同的事件的处理
        if (asyncExportingFutures.isEmpty() && asyncReferringFutures.isEmpty()) {
            onModuleStarted();
        } else {
            frameworkExecutorRepository.getSharedExecutor().submit(() -> {
                try {
                    // wait for export finish
                    waitExportFinish();
                    // wait for refer finish
                    waitReferFinish();
                } catch (Throwable e) {
                    logger.warn(CONFIG_FAILED_WAIT_EXPORT_REFER, "", "", "wait for export/refer services occurred an exception", e);
                } finally {
                    onModuleStarted();
                }
            });
        }
    } catch (Throwable e) {
        onModuleFailed(getIdentifier() + " start failed: " + e, e);
        throw e;
    }
    return startFuture;
}

其中exportServices是发布服务,之前的文章中已经分析过,referServices是服务引用,可异步可同步,如下

代码语言:javascript
复制
private void referServices() {
    configManager.getReferences().forEach(rc -> {
        try {
            ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
            if (!referenceConfig.isRefreshed()) {
                referenceConfig.refresh();
            }

            if (rc.shouldInit()) {
                if (referAsync || rc.shouldReferAsync()) {
                    ExecutorService executor = executorRepository.getServiceReferExecutor();
                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                        try {
                            referenceCache.get(rc);
                        } catch (Throwable t) {
                            logger.error(CONFIG_FAILED_EXPORT_SERVICE, "", "", "Failed to async export service config: " + getIdentifier() + " , catch error : " + t.getMessage(), t);
                        }
                    }, executor);

                    asyncReferringFutures.add(future);
                } else {
                    referenceCache.get(rc);
                }
            }
        } catch (Throwable t) {
            logger.error(CONFIG_FAILED_REFERENCE_MODEL, "", "", "Model reference failed: " + getIdentifier() + " , catch error : " + t.getMessage(), t);
            referenceCache.destroy(rc);
            throw t;
        }
    });
}

本质来说就是调用referenceCache.get方法,看看实现

代码语言:javascript
复制
public <T> T get(ReferenceConfigBase<T> rc) {
    String key = generator.generateKey(rc);
    Class<?> type = rc.getInterfaceClass();

    boolean singleton = rc.getSingleton() == null || rc.getSingleton();
    T proxy = null;
    // 先从缓存里面去取,其实就是根据key到referenceKeyMap中获取
    if (singleton) {
        proxy = get(key, (Class<T>) type);
    } else {
        logger.warn(CONFIG_API_WRONG_USE, "", "", "Using non-singleton ReferenceConfig and ReferenceCache at the same time may cause memory leak. " +
            "Call ReferenceConfig#get() directly for non-singleton ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");
    }

    if (proxy == null) {
        // 缓存中没有,再调用rc.get方法获取
        List<ReferenceConfigBase<?>> referencesOfType = ConcurrentHashMapUtils.computeIfAbsent(referenceTypeMap, type, _t -> Collections.synchronizedList(new ArrayList<>()));
        referencesOfType.add(rc);
        List<ReferenceConfigBase<?>> referenceConfigList = ConcurrentHashMapUtils.computeIfAbsent(referenceKeyMap, key, _k -> Collections.synchronizedList(new ArrayList<>()));
        referenceConfigList.add(rc);
        proxy = rc.get();
    }

    return proxy;
}

先从缓存里面去取,其实就是根据key到referenceKeyMap中获取,缓存中没有,再调用rc.get方法获取,看下rc.get方法

代码语言:javascript
复制
public T get() {
    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }

    if (ref == null) {
        // ensure start module, compatible with old api usage
        getScopeModel().getDeployer().start();

        init();
    }

    return ref;
}

protected synchronized void init() {
    if (initialized && ref != null) {
        return;
    }
    try {
        ......

        ref = createProxy(referenceParameters);

        ......
    } catch (Throwable t) {
        // 异常处理逻辑
        ......
    }
    initialized = true;
}

rc.get方法中调用init方法,然后调用createProxy方法生成代理对象,

代码语言:javascript
复制
private T createProxy(Map<String, String> referenceParameters) {
    ......
    createInvoker();
    ......
    // 创建代理对象,默认使用javassist
    return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

而代理的invoker则是由createInvoker方法生成

代码语言:javascript
复制
private void createInvoker() {
    if (urls.size() == 1) {
        URL curUrl = urls.get(0);
        invoker = protocolSPI.refer(interfaceClass, curUrl);
        // registry url, mesh-enable and unloadClusterRelated is true, not need Cluster.
        if (!UrlUtils.isRegistry(curUrl) &&
                !curUrl.getParameter(UNLOAD_CLUSTER_RELATED, false)) {
            List<Invoker<?>> invokers = new ArrayList<>();
            invokers.add(invoker);
            invoker = Cluster.getCluster(getScopeModel(), Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
        }
    } else {
        List<Invoker<?>> invokers = new ArrayList<>();
        URL registryUrl = null;
        for (URL url : urls) {
            // For multi-registry scenarios, it is not checked whether each referInvoker is available.
            // Because this invoker may become available later.
            invokers.add(protocolSPI.refer(interfaceClass, url));

            if (UrlUtils.isRegistry(url)) {
                // use last registry url
                registryUrl = url;
            }
        }

        if (registryUrl != null) {
            // registry url is available
            // for multi-subscription scenario, use 'zone-aware' policy by default
            String cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
            // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker
            // (RegistryDirectory, routing happens here) -> Invoker
            invoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);
        } else {
            // not a registry url, must be direct invoke.
            if (CollectionUtils.isEmpty(invokers)) {
                throw new IllegalArgumentException("invokers == null");
            }
            URL curUrl = invokers.get(0).getUrl();
            String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);
            invoker = Cluster.getCluster(getScopeModel(), cluster).join(new StaticDirectory(curUrl, invokers), true);
        }
    }
}

和provider类似,通过protocolSPI.refer方法来处理,protocolSPI第一步获取类型为registry的protocol,获取到的其实是一个链表,链表中的每个元素依次进行refer

ProtocolSPIChain
ProtocolSPIChain

直接看最后一个元素InterfaceCompatibleRegistryProtocol的refer方法,这个类继承自RegistryProtocol,其实是调用RegistryProtocol的refer方法

代码语言:javascript
复制
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = getRegistryUrl(url);
    Registry registry = getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
    String group = qs.get(GROUP_KEY);
    if (StringUtils.isNotEmpty(group)) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
        }
    }

    Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
    return doRefer(cluster, registry, type, url, qs);
}

核心代码就最后两行,获取Cluster,默认是FailoverCluster,调用deRefer方法

代码语言:javascript
复制
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
    Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());
    consumerAttribute.remove(REFER_KEY);
    String p = isEmpty(parameters.get(PROTOCOL_KEY)) ? CONSUMER : parameters.get(PROTOCOL_KEY);
    URL consumerUrl = new ServiceConfigURL(
        p,
        null,
        null,
        parameters.get(REGISTER_IP_KEY),
        0, getPath(parameters, type),
        parameters,
        consumerAttribute
    );
    url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
    ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
    return interceptInvoker(migrationInvoker, url, consumerUrl);
}

doRefer方法中首先获取Invoker,获取到的是一个MigrationInvoker的实例,属性详情如下

MigrationInvoker
MigrationInvoker

然后调用interceptInvoker方法对Invoker做后续处理

代码语言:javascript
复制
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
    List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
    if (CollectionUtils.isEmpty(listeners)) {
        return invoker;
    }

    for (RegistryProtocolListener listener : listeners) {
        listener.onRefer(this, invoker, consumerUrl, url);
    }
    return invoker;
}

在interceptInvoker方法中找到可用的RegistryProtocolListener列表,依次执行RegistryProtocolListener的onRefer方法

可用的就一个MigrationRuleListener,看下它的onRefer方法

代码语言:javascript
复制
public void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker, URL consumerUrl, URL registryURL) {
    MigrationRuleHandler<?> migrationRuleHandler = ConcurrentHashMapUtils.computeIfAbsent(handlers, (MigrationInvoker<?>) invoker, _key -> {
        ((MigrationInvoker<?>) invoker).setMigrationRuleListener(this);
        return new MigrationRuleHandler<>((MigrationInvoker<?>) invoker, consumerUrl);
    });

    migrationRuleHandler.doMigrate(rule);
}

以MigrationInvoker和consumerUrl为入参构造了一个migrationRuleHandler,然后调用doMigrate方法,doMigrate

调用refreshInvoker方法处理

代码语言:javascript
复制
private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
    if (step == null || threshold == null) {
        throw new IllegalStateException("Step or threshold of migration rule cannot be null");
    }
    MigrationStep originStep = currentStep;

    if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
        boolean success = true;
        switch (step) {
            case APPLICATION_FIRST:
                migrationInvoker.migrateToApplicationFirstInvoker(newRule);
                break;
            case FORCE_APPLICATION:
                success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
                break;
            case FORCE_INTERFACE:
            default:
                success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
        }

        if (success) {
            setCurrentStepAndThreshold(step, threshold);
            logger.info("Succeed Migrated to " + step + " mode. Service Name: " + consumerURL.getDisplayServiceKey());
            report(step, originStep, "true");
        } else {
            // migrate failed, do not save new step and rule
            logger.warn(INTERNAL_ERROR, "unknown error in registry module", "", "Migrate to " + step + " mode failed. Probably not satisfy the threshold you set "
                + threshold + ". Please try re-publish configuration if you still after check.");
            report(step, originStep, "false");
        }

        return success;
    }
    // ignore if step is same with previous, will continue override rule for MigrationInvoker
    return true;

最后调用了migrationInvoker.migrateToApplicationFirstInvoker方法

代码语言:javascript
复制
public void migrateToApplicationFirstInvoker(MigrationRule newRule) {
    CountDownLatch latch = new CountDownLatch(0);
    refreshInterfaceInvoker(latch);
    refreshServiceDiscoveryInvoker(latch);

    // directly calculate preferred invoker, will not wait until address notify
    // calculation will re-occurred when address notify later
    calcPreferredInvoker(newRule);
}

详细看看refreshInterfaceInvoker方法的逻辑

代码语言:javascript
复制
protected void refreshInterfaceInvoker(CountDownLatch latch) {
    clearListener(invoker);
    if (needRefresh(invoker)) {
        if (logger.isDebugEnabled()) {
            logger.debug("Re-subscribing interface addresses for interface " + type.getName());
        }

        if (invoker != null) {
            invoker.destroy();
        }
        invoker = registryProtocol.getInvoker(cluster, registry, type, url);
    }
    setListener(invoker, () -> {
        latch.countDown();
        if (reportService.hasReporter()) {
            reportService.reportConsumptionStatus(
                reportService.createConsumptionReport(consumerUrl.getServiceInterface(), consumerUrl.getVersion(), consumerUrl.getGroup(), "interface"));
        }
        if (step == APPLICATION_FIRST) {
            calcPreferredInvoker(rule);
        }
    });
}

该方法中主要就是设置了migrationInvoker的invoker属性,通过registryProtocol.getInvoker获取,这里的registryProtocol就是

上文中提到过了InterfaceCompatibleRegistryProtocol

代码语言:javascript
复制
public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
    DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);
    return doCreateInvoker(directory, cluster, registry, type);
}

getInvoker中又继续调用父类RegistryProtocol.doCreateInvoker方法

代码语言:javascript
复制
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
    URL urlToRegistry = new ServiceConfigURL(
        parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),
        parameters.remove(REGISTER_IP_KEY),
        0,
        getPath(parameters, type),
        parameters
    );
    urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
    urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
    if (directory.isShouldRegister()) {
        // 将消费者信息写入注册中心
        directory.setRegisteredConsumerUrl(urlToRegistry);
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(urlToRegistry);
    // 生成所有提供者的FilterInvoker,此处的Filter是针对单个invoker的Filter(最底层的是DubboInvoker)
    directory.subscribe(toSubscribeUrl(urlToRegistry));
    // 生成集群级别的FilterInvoker,包含了大部分默认的Filter
    return (ClusterInvoker<T>) cluster.join(directory, true);
}

详细看看生成提供者的FilterInvoker的过程,通过directory.subscribe方法

代码语言:javascript
复制
public void subscribe(URL url) {
    ApplicationModel applicationModel = url.getApplicationModel();
    MetricsEventBus.post(RegistryEvent.toSubscribeEvent(applicationModel),() ->
        {
            super.subscribe(url);
            return null;
        }
    );
    if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, org.apache.dubbo.registry.Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
        consumerConfigurationListener.addNotifyListener(this);
        referenceConfigurationListener = new ReferenceConfigurationListener(moduleModel, this, url);
    }
}

核心逻辑是通过调用super.subscribe方法来处理

代码语言:javascript
复制
public void subscribe(URL url) {
    setSubscribeUrl(url);
    registry.subscribe(url, this);
}

接着看registry.subscribe方法,入参listener为最初的调用者directory

代码语言:javascript
复制
public void subscribe(URL url, NotifyListener listener) {
    try {
        if (registry != null) {
            // 在这里生成所有提供者的FilterInvoker,并且监听提供者的的变化
            // 这里的registry是一个ZookeeperRegistry实例
            registry.subscribe(url, listener);
        }
    } finally {
        listenerEvent(serviceListener -> serviceListener.onSubscribe(url, registry));
    }
}

最后调用ZookeeperRegistry类的doSubscribe方法

代码语言:javascript
复制
public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        checkDestroyed();
        if (ANY_VALUE.equals(url.getServiceInterface())) {
            // 由于我本地跑的都是具体的interface,所以不会走到这个分支
        } else {
            CountDownLatch latch = new CountDownLatch(1);

            try {
                List<URL> urls = new ArrayList<>();

                /*
                    Iterate over the category value in URL.
                    With default settings, the path variable can be when url is a consumer URL:

                        /dubbo/[service name]/providers,
                        /dubbo/[service name]/configurators
                        /dubbo/[service name]/routers
                */
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = ConcurrentHashMapUtils.computeIfAbsent(zkListeners, url, k -> new ConcurrentHashMap<>());
                    ChildListener zkListener = ConcurrentHashMapUtils.computeIfAbsent(listeners, listener, k -> new RegistryChildListenerImpl(url, k, latch));

                    if (zkListener instanceof RegistryChildListenerImpl) {
                        ((RegistryChildListenerImpl) zkListener).setLatch(latch);
                    }

                    // create "directories".
                    zkClient.create(path, false, true);

                    // 添加提供者的监听器,并且获取到所有的提供者添加到urls中
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        // The invocation point that may cause 1-1.
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 生成提供者的Invoker
                notify(url, listener, urls);
            } finally {
                // tells the listener to run only after the sync notification of main thread finishes.
                latch.countDown();
            }
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

先注册了一个zkListener,zkListener为一个RegistryChildListenerImpl的实例

代码语言:javascript
复制
private class RegistryChildListenerImpl implements ChildListener {
    ......
    @Override
    public void childChanged(String path, List<String> children) {
        // Notify 'notifiers' one by one.
        try {
            latch.await();
        } catch (InterruptedException e) {
            logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Zookeeper children listener thread was interrupted unexpectedly, may cause race condition with the main thread.");
        }

        notifier.notify(path, children);
    }
}

当提供者发生变化时,通过childChanged方法调用notifier.notify最终调用到以下的notify方法更新invoker,并且在注册完zkListener后也是调用这个方法生成提供者的invoker

代码语言:javascript
复制
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    try {
        doNotify(url, listener, urls);
    } catch (Exception t) {
        // Record a failed registration request to a failed list
        logger.error(REGISTRY_FAILED_NOTIFY_EVENT, "", "", "Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
    }
}

最终又调回listener的notify方法,并且通过listener的refreshOverrideAndInvoker方法刷新invoker

代码语言:javascript
复制
protected synchronized void refreshOverrideAndInvoker(List<URL> urls) {
    // mock zookeeper://xxx?mock=return null
    this.directoryUrl = overrideWithConfigurator(getOriginalConsumerUrl());
    refreshInvoker(urls);
}

至此directory.subscribe(toSubscribeUrl(urlToRegistry))方法就执行完了,最后再通过cluster.join(directory,true)方法生成ClusterInvoker,本质是一个ClusterFilterInvoker,在其构造方法中构造集群级别的Filter链

代码语言:javascript
复制
public ClusterFilterInvoker(AbstractClusterInvoker<T> invoker) {
    List<FilterChainBuilder> builders = ScopeModelUtil.getApplicationModel(invoker.getUrl().getScopeModel()).getExtensionLoader(FilterChainBuilder.class).getActivateExtensions();
    if (CollectionUtils.isEmpty(builders)) {
        filterInvoker = invoker;
    } else {
        ClusterInvoker<T> tmpInvoker = invoker;
        for (FilterChainBuilder builder : builders) {
            tmpInvoker = builder.buildClusterInvokerChain(tmpInvoker, REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
        }
        filterInvoker = tmpInvoker;
    }
}

最后再来看看生成单个provider的invoker的逻辑

代码语言:javascript
复制
private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {
    Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
    if (urls == null || urls.isEmpty()) {
        return newUrlInvokerMap;
    }
    String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        ......
        Invoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);
        if (invoker == null) { // Not in the cache, refer again
            try {
                ......
                if (enabled) {
                    // 核心代码就这一行
                    invoker = protocol.refer(serviceType, url);
                }
            } catch (Throwable t) {
                // 异常处理逻辑省略
                ......
            }
            if (invoker != null) { // Put new invoker in cache
                newUrlInvokerMap.put(url, invoker);
            }
        } else {
            newUrlInvokerMap.put(url, invoker);
        }
    }
    return newUrlInvokerMap;
}

核心代码就一行调用protocol.refer方法处理,这里的protocol其实是也是一个适配工具类,即上文中提到的protocolSPI,所以第一步也是去获取protocol的处理链表,只是这次获取的是类型为dubbo的protocol,如下

ProtocolSPIChain
ProtocolSPIChain

最后到DubboProtocol的refer方法处理

代码语言:javascript
复制
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    checkDestroyed();
    return protocolBindingRefer(type, url);
}

@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    checkDestroyed();
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

新建了一个DubboInvoker,其构造函数第三个入参是根据url生成的一个NettyClient(具体生成的逻辑就不看了,比较简单)

最后再来看看最终生成的Invoker的结构,最外层是一个MigrationInvoker,最里层是一个带有Filter的集群级别的FilterInvoker

MigrationInvoker
MigrationInvoker

FilterInvoker有一个originalInvoker属性,看看它的结构

FilterInvoker
FilterInvoker

FilterInvoker的directory属性中包含了所有提供者的invoker,最里层是一个DubboInvoker

自此Consume端的启动完成

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 @EnableDubbo
  • 2 ReferenceAnnotationBeanPostProcessor
    • 3 DubboDeployApplicationListener
    相关产品与服务
    微服务引擎 TSE
    微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档