前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dubbo系列一之Provider启动

Dubbo系列一之Provider启动

原创
作者头像
用户9511949
发布2024-07-15 16:45:35
680
发布2024-07-15 16:45:35

1 @EnableDubbo

先看下Dubbo源码中demo启动类

代码语言:javascript
复制
@SpringBootApplication
@EnableDubbo(scanBasePackages = {"org.apache.dubbo.springboot.demo.provider"})
public class ProviderApplication {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(ProviderApplication.class, args);
        System.out.println("dubbo service started");
        new CountDownLatch(1).await();
    }
}

通过@EnableDubbo注解启动,该注解整合了@EnableDubboConfig和@DubboCompoentScan,这两个注解分别import了DubboConfigConfigurationRegistrar和DubboComponentScanRegistrar

代码语言:javascript
复制
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Import(DubboConfigConfigurationRegistrar.class)
public @interface EnableDubboConfig {......}

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {......}

DubboConfigConfigurationRegistrar和DubboComponentScanRegistrar都是在BeanDefinitionRegistry加载BeanDefinition的过程中向BeanDefinitionRegistry注册自定义的BeanDefinition,代码如下,都调用了DubboSpringInitializer.initialize方法初始化DubboSpringInitContext(如果存在就不再初始化),而DubboComponentScanRegistrar多了一个registerServiceAnnotationPostProcessor步骤,向BeanDefinitionRegistry中注册了一个ServiceAnnotationPostProcessor

代码语言:javascript
复制
public class DubboConfigConfigurationRegistrar implements ImportBeanDefinitionRegistrar {
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        // initialize dubbo beans
        DubboSpringInitializer.initialize(registry);BeanDefinitionRegistry
    }
}

public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar {
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        // initialize dubbo beans
        DubboSpringInitializer.initialize(registry);
        Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
        registerServiceAnnotationPostProcessor(packagesToScan, registry);
    }
}

DubboSpringInitializer.initialize最终调用DubboBeanUtils.registerCommonBeans注册一系列Dubbo的基础服务类,如下

代码语言:javascript
复制
static void registerCommonBeans(BeanDefinitionRegistry registry) {

    registerInfrastructureBean(registry, ServicePackagesHolder.BEAN_NAME, ServicePackagesHolder.class);

    registerInfrastructureBean(registry, ReferenceBeanManager.BEAN_NAME, ReferenceBeanManager.class);

    // Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean
    registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
        ReferenceAnnotationBeanPostProcessor.class);

    // TODO Whether DubboConfigAliasPostProcessor can be removed ?
    // Since 2.7.4 [Feature] https://github.com/apache/dubbo/issues/5093
    registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME,
        DubboConfigAliasPostProcessor.class);

    // register ApplicationListeners
    registerInfrastructureBean(registry, DubboDeployApplicationListener.class.getName(), DubboDeployApplicationListener.class);
    registerInfrastructureBean(registry, DubboConfigApplicationListener.class.getName(), DubboConfigApplicationListener.class);

    // Since 2.7.6 Register DubboConfigDefaultPropertyValueBeanPostProcessor as an infrastructure Bean
    registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME,
        DubboConfigDefaultPropertyValueBeanPostProcessor.class);

    // Dubbo config initializer
    registerInfrastructureBean(registry, DubboConfigBeanInitializer.BEAN_NAME, DubboConfigBeanInitializer.class);

    // register infra bean if not exists later
    registerInfrastructureBean(registry, DubboInfraBeanRegisterPostProcessor.BEAN_NAME, DubboInfraBeanRegisterPostProcessor.class);
}

这里主要先提下这两个类,后面详细分析

ReferenceAnnotationBeanPostProcessor:处理服务消费者Processor

DubboDeployApplicationListener:Provider服务发布的Listener

2 ServiceAnnotationPostProcessor

ServiceAnnotationPostProcessor的主要作用就是扫描@EnableDubbo中配置的scanBasePackages路径下需要暴露Dubbo服务的Service,ServiceAnnotationPostProcessor实现了BeanDefinitionRegistryPostProcessor接口,BeanDefinitionRegistryPostProcessor接口是BeanFactoryPostProcessor接口的子类。

BeanFactoryPostProcessor和BeanDefinitionRegistryPostProcessor分别定义了postProcessBeanFactory和postProcessBeanDefinitionRegistry方法,postProcessBeanFactory在Bean实例化前执行,而postProcessBeanDefinitionRegistry在postProcessBeanFactory之前执行,看下ServiceAnnotationPostProcessor的实现

代码语言:javascript
复制
// postProcessBeanDefinitionRegistry方法比较简单,就是使用DubboClassPathBeanDefinitionScanner进行扫描
// DubboClassPathBeanDefinitionScanner继承了Spring的ClassPathBeanDefinitionScanner
// 所以本质上也是利用Spring自带的工具类扫描
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
    this.registry = registry;
    scanServiceBeans(resolvedPackagesToScan, registry);
}

// postProcessBeanFactory首先处理@Bean和@DubboService一起使用的场景,这种场景DubboClassPathBeanDefinitionScanner
// 扫描不到,需要特殊处理,然后通过processAnnotatedBeanDefinition方法注册BeanDefinition,最后再判断需不需要扫描,
// 低版本的Spring可能不会调用postProcessBeanDefinitionRegistry,所以需要在此方法中进行扫描
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    if (this.registry == null) {
        // In spring 3.x, may be not call postProcessBeanDefinitionRegistry()
        this.registry = (BeanDefinitionRegistry) beanFactory;
    }
    // scan bean definitions
    String[] beanNames = beanFactory.getBeanDefinitionNames();
    for (String beanName : beanNames) {
        BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
        Map<String, Object> annotationAttributes = getServiceAnnotationAttributes(beanDefinition);
        if (annotationAttributes != null) {
            // process @DubboService at java-config @bean method
            processAnnotatedBeanDefinition(beanName, (AnnotatedBeanDefinition) beanDefinition, annotationAttributes);
        }
    }

    if (!scanned) {
        // In spring 3.x, may be not call postProcessBeanDefinitionRegistry(), so scan service class here
        scanServiceBeans(resolvedPackagesToScan, registry);
    }
}

看看scanServiceBeans方法的代码,只保留了核心逻辑

代码语言:javascript
复制
private void scanServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {

    scanned = true;
    ......
    DubboClassPathBeanDefinitionScanner scanner =
            new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
    ......
    for (String packageToScan : packagesToScan) {

        // Registers @Service Bean first
        scanner.scan(packageToScan);

        // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
        Set<BeanDefinitionHolder> beanDefinitionHolders =
                findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);

        if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
            ......
            for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
                processScannedBeanDefinition(beanDefinitionHolder);
                ......
            }
        } else {
            ......
        }

        ......
    }
}

主要就是通过DubboClassPathBeanDefinitionScanner扫描对应的包,然后调用processScannedBeanDefinition方法进行注册,和processAnnotatedBeanDefinition方法一样,最后都是通过buildServiceBeanDefinition先构建一个ServiceBean的BeanDefinition,然后将其注册到registry中,核心代码如下

代码语言:javascript
复制
private AbstractBeanDefinition buildServiceBeanDefinition(Map<String, Object> serviceAnnotationAttributes,
                                                          String serviceInterface,
                                                          String refServiceBeanName) {
    BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);
    // 下面都是一些重复的属性设置,省略                                                                                                        
    ......
}
代码语言:javascript
复制
private void registerServiceBeanDefinition(String serviceBeanName, AbstractBeanDefinition serviceBeanDefinition, String serviceInterface) {
    // check service bean
    if (registry.containsBeanDefinition(serviceBeanName)) {
        BeanDefinition existingDefinition = registry.getBeanDefinition(serviceBeanName);
        if (existingDefinition.equals(serviceBeanDefinition)) {
            // exist equipment bean definition
            return;
        }
        ......
    }
    registry.registerBeanDefinition(serviceBeanName, serviceBeanDefinition);
    ......
}

ServiceBean实现了InitializingBean接口,实例初始化完之后会执行afterPropertiesSet方法

代码语言:javascript
复制
public void afterPropertiesSet() throws Exception {
    if (StringUtils.isEmpty(getPath())) {
        if (StringUtils.isNotEmpty(getInterface())) {
            setPath(getInterface());
        }
    }
    // 从applicationContext中获取ModuleModel实例,这个类主要用来处理服务发布
    // 后续发布服务都要通过DubboBeanUtils.getModuleModel(applicationContext)来获取
    ModuleModel moduleModel = DubboBeanUtils.getModuleModel(applicationContext);
    moduleModel.getConfigManager().addService(this);
    moduleModel.getDeployer().setPending();
}

主要就是将该实例添加到了ModuleModel的ConfigManager中,至此Service的扫描注册就完成了

3 DubboDeployApplicationListener

DubboDeployApplicationListener主要用来处理Provider的服务发布(上面提到过),其实现了ApplicationListener<ApplicationContextEvent>接口,主要监听ApplicationContext相关的各种事件,比如Spring启动完成后会发布一个ContextRefreshedEvent事件

代码语言:javascript
复制
public void onApplicationEvent(ApplicationContextEvent event) {
    if (nullSafeEquals(applicationContext, event.getSource())) {
        if (event instanceof ContextRefreshedEvent) {
            onContextRefreshedEvent((ContextRefreshedEvent) event);
        } else if (event instanceof ContextClosedEvent) {
            onContextClosedEvent((ContextClosedEvent) event);
        }
    }
}

同样DubboDeployApplicationListener也实现了ApplicationContextAware接口,实例化后通过setApplicationContext方法将ModuleModel设置到自己的属性当中用户后续服务发布

代码语言:javascript
复制
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    ......
    this.moduleModel = DubboBeanUtils.getModuleModel(applicationContext);
    ......
}

DubboDeployApplicationListener监听到ContextRefreshedEvent事件之后,调用onContextRefreshedEvent方法进行服务发布

代码语言:javascript
复制
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
    // 通过ModuleModel获取发布器
    ModuleDeployer deployer = moduleModel.getDeployer();
    Assert.notNull(deployer, "Module deployer is null");
    // 开始服务发布的流程
    Future future = deployer.start();
    ......
}

最终调用DefaultModuleDeployer.startSync方法处理

代码语言:javascript
复制
private synchronized Future startSync() throws IllegalStateException {
    ......
    try {
        // new了一个CompletableFuture来以获取start的结果,并且发布一个start事件
        onModuleStarting();
        ......
        // export services
        exportServices();

        // refer services
        referServices();
        ......

        // 下面都是根据不同的发布结果,发布不同的事件的处理
        if (asyncExportingFutures.isEmpty() && asyncReferringFutures.isEmpty()) {
            onModuleStarted();
        } else {
            frameworkExecutorRepository.getSharedExecutor().submit(() -> {
                try {
                    // wait for export finish
                    waitExportFinish();
                    // wait for refer finish
                    waitReferFinish();
                } catch (Throwable e) {
                    logger.warn(CONFIG_FAILED_WAIT_EXPORT_REFER, "", "", "wait for export/refer services occurred an exception", e);
                } finally {
                    onModuleStarted();
                }
            });
        }
    } catch (Throwable e) {
        onModuleFailed(getIdentifier() + " start failed: " + e, e);
        throw e;
    }
    return startFuture;
}

其中主要的步骤exportServices最终就是获取到所有的ServiceBean实例,调用其父类ServiceConfig.export()方法进行服务发布,可异步可同步

代码语言:javascript
复制
private void exportServiceInternal(ServiceConfigBase sc) {
    ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
    if (!serviceConfig.isRefreshed()) {
        serviceConfig.refresh();
    }
    if (sc.isExported()) {
        return;
    }
    if (exportAsync || sc.shouldExportAsync()) {
        ExecutorService executor = executorRepository.getServiceExportExecutor();
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                if (!sc.isExported()) {
                    sc.export();
                    exportedServices.add(sc);
                }
            } catch (Throwable t) {
                logger.error(CONFIG_FAILED_EXPORT_SERVICE, "", "", "Failed to async export service config: " + getIdentifier() + " , catch error : " + t.getMessage(), t);
            }
        }, executor);

        asyncExportingFutures.add(future);
    } else {
        if (!sc.isExported()) {
            sc.export();
            exportedServices.add(sc);
        }
    }
}

ServiceConfig.export()方法的代码就不仔细看了,最终就是调用下面的doExportUrl方法进行服务发布,如下

代码语言:javascript
复制
private void doExportUrl(URL url, boolean withMetaData) {
    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
    if (withMetaData) {
        invoker = new DelegateProviderMetaDataInvoker(invoker, this);
    }
    Exporter<?> exporter = protocolSPI.export(invoker);
    exporters.add(exporter);
}

先看下第一个入参URL的核心属性,这里先将注册模式设置成interface模式(2.x版本,3.x版本增加了instance模式,先把2.x的流程搞清楚,3.x不外乎就是往注册中心多写了些信息而已),如下

URL
URL

核心属性就是urlAddress和attributes中的export属性,分别是注册中心的地址和需要export的service,后续流程要使用proxyFactory和protocolSPI是代理和协议的适配工具类,都是动态生成的,主要作用就是根据具体的协议或者代理类型获取对应的Extension,生成的代码如下

代码语言:javascript
复制
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.ProxyFactory.class);
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0);
    }
    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.ProxyFactory.class);
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0, arg1);
    }
    public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.ProxyFactory.class);
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}
代码语言:javascript
复制
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    public void destroy()  {
        throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public int getDefaultPort()  {
        throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
    public java.util.List getServers()  {
        throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        ScopeModel scopeModel = ScopeModelUtil.getOrDefault(url.getScopeModel(), org.apache.dubbo.rpc.Protocol.class);
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)scopeModel.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

proxyFactory生成了最原始的invoker(默认使用javassist,只对ref做了一些简单的代理)

代码语言:javascript
复制
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    try {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    } catch (Throwable fromJavassist) {
        // try fall back to JDK proxy factory
        try {
            Invoker<T> invoker = jdkProxyFactory.getInvoker(proxy, type, url);
            logger.error(PROXY_FAILED, "", "", "Failed to generate invoker by Javassist failed. Fallback to use JDK proxy success. " +
                "Interfaces: " + type, fromJavassist);
            // log out error
            return invoker;
        } catch (Throwable fromJdk) {
            logger.error(PROXY_FAILED, "", "", "Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. " +
                "Interfaces: " + type + " Javassist Error.", fromJavassist);
            logger.error(PROXY_FAILED, "", "", "Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. " +
                "Interfaces: " + type + " JDK Error.", fromJdk);
            throw fromJavassist;
        }
    }
}

protocolSPI第一步获取类型为registry的protocol,获取到的其实是一个链表,链表中的每个元素依次进行export

ProtocolSPIChain
ProtocolSPIChain

直接看最后一个元素InterfaceCompatibleRegistryProtocol的export方法,这个类继承自RegistryProtocol,其实是调用RegistryProtocol的export方法,

代码语言:javascript
复制
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 获取注册中心的地址和需要export的的service地址
    URL registryUrl = getRegistryUrl(originInvoker);
    URL providerUrl = getProviderUrl(originInvoker);
    ......
    // 发布本地服务,启动nettyserver
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // 注册到注册中心
    final Registry registry = getRegistry(registryUrl);
    final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

    boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);
    if (register) {
        register(registry, registeredProviderUrl);
    }
    ......
    return new DestroyableExporter<>(exporter);
}

再来详细看看doLocalExport方法

代码语言:javascript
复制
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);

    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}

调用protocol.export方法处理,这里的protocol其实是也是一个适配工具类,即上文中提到的protocolSPI,所有第一步也是去获取protocol的处理链表,只是这次获取的是类型为dubbo的protocol,如下

ProtocolSPIChain
ProtocolSPIChain

Filter链的构建在ProtocolFilterWrapper.export中构建

代码语言:javascript
复制
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (UrlUtils.isRegistry(invoker.getUrl())) {
        return protocol.export(invoker);
    }
    // 构建被Filter封装的invoker
    FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
    return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}

最后到DubboProtocol的export方法处理,此方法处理时invoker已经变成了封装好的具备Filter的invoker,结构如下图所示

FilterInvoker
FilterInvoker

export方法主要就是生成了serviceKey,serviceKey的格式为 groupName/interfacePath:version:port,并以此作为key,DubboExporter作为value放到了exporterMap(该逻辑在DubboExporter的构造函数中),最后通过openServer方法启动nettyserver

代码语言:javascript
复制
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    ......
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);

    openServer(url);
    ...
    return exporter;
}

再来看看具体创建server的地方,最终通过createServer方法启动

代码语言:javascript
复制
private ProtocolServer createServer(URL url) {
    ......
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    ......
    DubboProtocolServer protocolServer = new DubboProtocolServer(server);
    loadServerProperties(protocolServer);
    return protocolServer;
}

然后通过Exchangers.bind方法启动server,传入的requestHandler是请求的处理类,在DubboProtocol新建时初始化,用来处理消费端的请求

代码语言:javascript
复制
public DubboProtocol(FrameworkModel frameworkModel) {
    requestHandler = new ExchangeHandlerAdapter(frameworkModel) {

        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            Invoker<?> invoker = inv.getInvoker() == null ? getInvoker(channel, inv) : inv.getInvoker();
            // switch TCCL
            if (invoker.getUrl().getServiceModel() != null) {
                Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());
            }
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", new IllegalStateException("The methodName " + inv.getMethodName()
                        + " not found in callback service interface ,invoke will be ignored."
                        + " please update the api interface. url is:"
                        + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
            return result.thenApply(Function.identity());
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);

            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if (logger.isDebugEnabled()) {
                logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
            }
            invoke(channel, ON_DISCONNECT_KEY);
        }

        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    if (Boolean.TRUE.toString().equals(invocation.getAttachment(STUB_EVENT_KEY))) {
                        tryToGetStubService(channel, invocation);
                    }
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", "Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        private void tryToGetStubService(Channel channel, Invocation invocation) throws RemotingException {
            try {
                Invoker<?> invoker = getInvoker(channel, invocation);
            } catch (RemotingException e) {
                String serviceKey = serviceKey(
                    0,
                    (String) invocation.getObjectAttachmentWithoutConvert(PATH_KEY),
                    (String) invocation.getObjectAttachmentWithoutConvert(VERSION_KEY),
                    (String) invocation.getObjectAttachmentWithoutConvert(GROUP_KEY)
                );
                throw new RemotingException(channel, "The stub service[" + serviceKey + "] is not found, it may not be exported yet");
            }
        }

        /**
         * FIXME channel.getUrl() always binds to a fixed service, and this service is random.
         * we can choose to use a common service to carry onConnect event if there's no easy way to get the specific
         * service this connection is binding to.
         * @param channel
         * @param url
         * @param methodKey
         * @return
         */
        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }

            RpcInvocation invocation = new RpcInvocation(url.getServiceModel(), method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]);
            invocation.setAttachment(PATH_KEY, url.getPath());
            invocation.setAttachment(GROUP_KEY, url.getGroup());
            invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
            invocation.setAttachment(VERSION_KEY, url.getVersion());
            if (url.getParameter(STUB_EVENT_KEY, false)) {
                invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
            }

            return invocation;
        }
    };
    this.frameworkModel = frameworkModel;
}

最后通过NettyTransporter启动了一个nettyserver

代码语言:javascript
复制
public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }

}

这样整个provider的启动流程就完成了

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 @EnableDubbo
  • 2 ServiceAnnotationPostProcessor
  • 3 DubboDeployApplicationListener
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档