首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >京东技术解密之配置中心DUCC

京东技术解密之配置中心DUCC

作者头像
公众号_松华说
发布2019-07-16 11:35:06
8.3K1
发布2019-07-16 11:35:06
举报
文章被收录于专栏:松华说松华说

一、使用方法

简单说下DUCC的特点

支持多环境(或称分组),分组可以合并 内置强大的基于插件的数据绑定框架,支持多种类型转换; 支持Log4j、Log4j2、Logback的动态修改日记级别功能。 支持Spring原生注解、支持自定义注解,客户端代码入侵性低 支持客户端多配置源,支持自定义配置,如ZK、Consol扩展 支持配置预案切换

接下来说说怎么用,下面我代码中的ConfiguratorManager就是DUCC的配置管理类

@Configuration
@Log4j2
@Order(value = 0)
public class Config extends PropertyPlaceholderConfigurer {

    //发布系统的配置
    private static Map<String,String> joneProperty =  new HashMap<>();
    //DUCC系统的配置
    private static Map<String,String> duccProperty = new HashMap<>();

    @Bean(initMethod = "start" , destroyMethod = "stop")
    public  ConfiguratorManager configuratorManager()
    {
        ConfiguratorManager configuratorManager = ConfiguratorManager.getInstance() ;
        configuratorManager.setApplication(getJoneProperty("laf.config.manager.application"));
        configuratorManager.addResource(new Resource("ucc",getJoneProperty("laf.config.manager.uri")));
        configuratorManager.addListener(new ConfigurationListener.CustomConfigurationListener("ucc") {
            @Override
            public void onUpdate(com.jd.laf.config.Configuration configuration) {
                List<Property> properties = configuration.getProperties();
                for (Property property:properties) {
                    log.info("duccConfig update key:{}",property.getKey());
                    duccProperty.put(property.getKey(), String.valueOf(property.getValue()));
                }
            }
        });
        return configuratorManager ;
    }

    public String getDuccProperty(String key)
    {
        if(duccProperty.containsKey(key)) {
            return duccProperty.get(key);
        }
        Property property = configuratorManager().getProperty(key);
        if(property==null || String.valueOf(property.getValue()).isEmpty()) {
            log.error("配置降级,key:{}",key);
            return getJoneProperty(key);
        }
        duccProperty.put(key, String.valueOf(property.getValue()));
        return String.valueOf(property.getValue());
    }

    @Override
    protected void processProperties(ConfigurableListableBeanFactory beanFactory, Properties props)throws BeansException {
        super.processProperties(beanFactory, props);
        for (Object key : props.keySet()) {
            String keyStr = key.toString();
            joneProperty.put(keyStr, String.valueOf(props.getProperty(keyStr)));
        }
    }

    public  String getJoneProperty(String key)
    {
        return joneProperty.get(key);
    }

}

二、DUCC各重要模块解读

1、ConfiguratorManager统一配置管理类

主要步骤: 1.1 从服务器获取配置信息 1.2 将配置保存到本地 1.3 启动更新事件消费者 1.4 为每个Resource(可以理解为分组)启动变更监控线程Watcher,监听配置变更

public class ConfiguratorManager implements ConfigurationSupplier, Watchable {


      //通知
    protected Notifier notifier = new Notifier() {
        @Override
        public <M, T extends Listener> void send(M property, List<T> listeners) {
            inform(property, listeners);
        }
    };

    //事件
protected BlockingQueue<Resource> events = new ArrayBlockingQueue<Resource>(5000);


     @Override
    public Property getProperty(final String key) {
         //最终是通过configuration获取的,后面会讲到
        return configuration.getProperty(name);
    }

     @Override
    public boolean addListener(final PropertyListener listener) {

           //判断是否重复添加

           groupResources.addListener(listener.getKey(), listener);
       }

         /**
     * 通知监听器,notify执行send方法时会触发
     *
     * @param property  变更的配置
     * @param listeners 监听器
     */
    protected <M, T extends Listener> void inform(final M property, final List<T> listeners) {
        if (listeners == null) {
            return;
        }
        for (final T listener : listeners) {
            if (!isStarted()) {
                return;
            }
            notifierThreads.execute(new Runnable() {
                @Override
                public void run() {
                    if (isStarted()) {
                        listener.onUpdate(property);
                    }
                }
            });
        }
    }


        /**
     * 启动
     */
    public void start() throws Exception {
         //验证降级文件路径可写等
        validate();
        synchronized (mutex) {
            if (started.compareAndSet(false, true)) {
                ExecutorService executorService = null;
                try {
                //资源URL解析,排序合并
                initializeResource()
                resource.setConfigurator(configurator)
                //远程加载,初始化上下文context
                startRemote(resources, executorService)
                //通知,最后还是会执行到notifier.send()方法
                inform(groupResource, groupConfiguration);
                //启动变更监听
                consumer = new Thread(new EventConsumer());
                consumer.start();
                //所有资源已经初始化过,安全启动监听器
                //支持多个分组,推荐重复度高的单独抽出来
                for (Resource resource : groupResources) {
                    if (resource.isReady()) {
                        resource.watch();
                    }
                }
               } catch (Exception e) {
                stop();
                throw e;
            } finally {
                if (executorService != null) {
                    executorService.shutdownNow();
                }
            }
            }
        }

     }


    protected class EventConsumer implements Runnable {
            @Override
            public void run() {
                while (isStarted() && !Thread.currentThread().isInterrupted()) {
                    try {
                        Resource event = events.poll(1000, TimeUnit.MICROSECONDS);
                        if (event != null) {
                            onUpdateConfig(event);
                        }
                    } catch (InterruptedException e) {
                       //必须重置中断信号标记
                        Thread.currentThread().interrupt();
                    }
                }
        }

        /**
         * 资源-配置发生变更
         *
         * @param resource
         */
        protected void onUpdateConfig(final Resource resource) {
           //省略很多行
           /获取旧的配置项
            //获取新的配置项
            //判断是否变更或者新增或者有删除
            //通知配置监听器,最后还是会执行到notifier.send()
            inform(newProperty, listeners);
        }
 }

2、监听器容器Observer

Observer包含了各种Listener,同时拥有一个ConfiguratorManager的成员变量,Lister最终会传递到该变量中

/**
 * 监听器容器
 */
public class Observer implements InitializingBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {

     protected ConfiguratorManager manager;

     /**
     *
     *   省略其他
     */
     public void addPropertyListener(final PropertyListener listener) {
        propertyListeners.add(listener);
    }

        /**
     * 添加监听器
     */
    public synchronized void execute() {
        if (!processed) {
            if (manager != null) {
                for (PropertyListener listener : propertyListeners) {
                    manager.addListener(listener);
                }
                for (ConfigurationListener listener : configurationListeners) {
                    manager.addListener(listener);
                }
                for (Map.Entry<String, String> entry : scriptListeners.entrySet()) {
                    manager.addListener(new JavaScriptListener(entry.getKey(), entry.getValue(), context));
                }
                //针对@ConfigurationPropties注解
                for (ConfigPropertiesBean cfgBean : beans) {
                    String prefix = cfgBean.getPrefix();
                    Object bean = cfgBean.getBean();
                    if (bean == null && context.containsBean(cfgBean.getBeanName())) {
                        bean = context.getBean(cfgBean.getBeanName());
                    }
                    if (bean != null) {
                        //添加监听器
                        process(bean, cfgBean.getBeanName(), prefix);
                    }
                }
            }
            processed = true;
        }
    }

    /**
    * 当BeanFactory设置完Bean属性后会调用此方法,可以添加初始化方法
    ***/

      @Override
    public void afterPropertiesSet() throws Exception {

        //在BeanDefinition中定义注入的
        if (fieldListeners != null) {
            for (FieldListener fieldListener : fieldListeners) {
                addFieldListener(fieldListener);
            }
        }

        if (methodListeners != null) {
            for (MethodListener methodListener : methodListeners) {
                addMethodListener(methodListener);
            }
        }
    }

    /**
     * 获取上下文的引用,需要实现ApplicationContextAware接口
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.context = applicationContext;
    }

    @Override
    public void onApplicationEvent(final ContextRefreshedEvent event) {
        if (event.getSource() == context) {
            execute();
        }
    }

}

如果在上下文中部署一个实现了ApplicationListener接口的Bean,那么每当在一个ApplicationEvent发布到ApplicationContext时,这个Bean会得到通知,其实这就是标准的Oberver设计模式 当ApplicationContext实例完成后,会调用onApplicationEvent()方法,执行execute()方法,然后将PropertyListener/ConfigurationLister添加到ConfiguratorManager实例中

3、Bean实例化后置处理器ConfigPostProcessor

/**
 * Bean实例化后置处理器,保存ConfiguratorManager实例,处理Bean的配置
 *
 */
public class ConfigPostProcessor implements BeanPostProcessor, PriorityOrdered, ApplicationContextAware {

    protected ApplicationContext applicationContext;

    protected Observer observer;

    /**
     * PostProcessBeforeInitialization 方法会在Bean构造完成后(构造方法执行完成),初始化方法(init-method)方法调用之前被调用
     * */
    @Override
        public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
    //处理配置对象
    if (!process(bean, beanName, observer)) {
        //处理属性监听器
        doWithFields(bean.getClass(), new FieldCallback() {
            @Override
            public void doWith(final Field field) {
                process(field, bean, beanName, observer);
            }
        }, new FieldFilter() {
            @Override
            public boolean matches(final Field field) {
                return !Modifier.isFinal(field.getModifiers()) && !Modifier.isStatic(field.getModifiers());
            }
        });

        //处理方法生成的配置对象
        doWithMethods(bean.getClass(), new MethodCallback() {
            @Override
            public void doWith(Method method) {
                process(method, bean, beanName, observer);
            }
        });
    }

    return bean;
        }

    /**
     * 处理方法
     *
     * @param method
     * @param bean
     * @param beanName
     */
    protected void process(final Method method, final Object bean, final String beanName, final Observer observer) {

    }

    /**
     * 处理字段
     *
     * @param field
     * @param bean
     * @param beanName
     */
    protected void process(final Field field, final Object bean, final String beanName, final Observer observer) {
        //SPI服务发现
        for (FieldProcessor processor : FIELD.extensions()) {
            processor.process(bean, beanName, field, observer);
        }
    }

    /**
     * 处理Bean
     *
     * @param bean
     * @param beanName
     * @return
     */
    protected boolean process(final Object bean, final String beanName, final Observer observer) {

    }

    /**
    *维持Observer的成员变量
    **/
    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (bean instanceof ConfiguratorManager && observer.getManager() == null) {
            observer.setManager((ConfiguratorManager) bean);
        }
        if (bean instanceof ConfigurationListener) {
            observer.addConfigurationListener((ConfigurationListener) bean);
        }
        if (bean instanceof PropertyListener) {
            observer.addPropertyListener((PropertyListener) bean);
        }
        return bean;
    }


    @Override
    public int getOrder() {
         //优先级要放最低
        return Ordered.LOWEST_PRECEDENCE;
    }


    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        this.applicationContext = context;
        this.observer = context.getBean(Observer.class);
    }
}

我们可以看到ConfigPostProcessor加载通过SPI服务发现的方法\字段处理类,然后执行process方法,其内部封装了Observer.addPropertyListener

4、Bean工厂后置处理器PropertySourcesFactorPostProcess

/**
 * 注册PropertySource
 */
public class PropertySourcesFactoryPostProcessor implements     BeanFactoryPostProcessor, EnvironmentAware, PriorityOrdered {

    protected ConfigurableEnvironment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = (ConfigurableEnvironment) environment;
    }

    @Override
    public int getOrder() {
        //最低优先级,先处理Spring内置的PropertySources
        return Ordered.LOWEST_PRECEDENCE;
    }

    @Override
    public void postProcessBeanFactory(final ConfigurableListableBeanFactory factory) throws BeansException {
        //其它的PropertySource已经加载了,可以安全创建ConfiguratorManager
        ConfiguratorManager manager = factory.getBean(CONFIGURATOR_MANAGER, ConfiguratorManager.class);
        Observer observer = factory.getBean(OBSERVER_BEAN_NAME, Observer.class);
        //将manager设置为observer的成员变量
        if (observer.getManager() == null) {
            observer.setManager(manager);
        }
        // 注册 Spring 属性配置
        environment.getPropertySources().addFirst(new ConfigSource(manager));
        //Bean的定义已经注册,由于ConfiguratorManager延迟加载,存在Bean定义中的占位符没有被替换的情况。
        //用ConfiguratorManager替换一下剩余的占位符
        resolvePlaceHolder(factory, manager);
    }

    /**
     * 解析Bean的变量
     *
     * @param factory
     * @param manager
     */
    protected void resolvePlaceHolder(final ConfigurableListableBeanFactory factory, final ConfiguratorManager manager) {
        StringValueResolver valueResolver = new ConfigResolver(manager);
        BeanDefinitionVisitor visitor = new BeanDefinitionVisitor(valueResolver);

        String[] beanNames = factory.getBeanDefinitionNames();
        for (String beanName : beanNames) {
            if (!factory.containsSingleton(beanName)) {
                //Bean已经实例化为单例
                BeanDefinition bd = factory.getBeanDefinition(beanName);
                try {
                    visitor.visitBeanDefinition(bd);
                } catch (Exception ex) {
                    throw new BeanDefinitionStoreException(bd.getResourceDescription(), beanName, ex.getMessage(), ex);
                }
            }
        }

        // New in Spring 2.5: resolve placeholders in alias target names and aliases as well.
        factory.resolveAliases(valueResolver);
        // New in Spring 3.0: resolve placeholders in embedded values such as annotation attributes.
        factory.addEmbeddedValueResolver(valueResolver);
    }
}

可以看到在PostProcessBeanFactory方法中,实例化了ConfiguratorManager和Observer,并把Manager设置为Observer的成员变量。另外构造了一个包含Manager的配置属性源propertrySources(属性集合,内部封装个多个k/v),并放到Sping属性源的第一个

5、资源配置器

public abstract class AbstractConfigurator implements Configurator, Prototype {

    /**
     * 初始化
     *
     * @param context
     * @throws Exception
     */
    Configurator setup(Context context) throws Exception {
    }

    /**
     * 拉取配置
     *
     * @param version     当前版本
     * @param longPolling 长轮询时间
     * @return
     * @throws Exception
     */
    Configuration pull(long version, int longPolling) throws Exception {
    }

    /**
     * 监听配置变化
     *
     * @param version 当前版本
     * @return
     */
    boolean watch(long version) {
         //这里只判断properties是否有更新,不涉及更新的具体类型
        if (!configuration.equals(resource.getConfiguration())) {
            resource.setConfiguration(configuration);
            //最终调用的是events.add(resource),也就是manager中的那个events
            context.fire(resource);
        }
    }

    /**
     * 停止监听
     */
    void stop() {}

    /**
     * 资源来源
     *
     * @return 资源来源。
     */
    Source source() {}

    /**
     * 从URL中返回资源名称
     *
     * @param url URL
     * @return 资源名称
     */
    String name(URL url){}

}

Configuration主要是对properties进行操作,Resource封装了Configuration。DUCC通过SPI服务发现将FileConfigurator、SystemConfigurator等extends了AbstractConfigurator的类自动加载进来从而达到可插拨扩展其他配置源的效果,也是通过这种机制支持所有数据格式、适配其他操作系统、实现方法字段属性配置化

6、SPI服务发现

Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制,常见的JDBC、SLF4门面就是通过这个实现的,下面是DUCC的运用

    /**
 * 插件管理器
 */
public interface Plugin {
    /**
     * 字段处理器扩展点
     */
    ExtensionPoint<FieldProcessor, String> FIELD = new ExtensionPointLazy<FieldProcessor, String>(FieldProcessor.class, SpiLoader.INSTANCE, null, null);
    /**
     * Bean处理器扩展点
     */
    ExtensionPoint<BeanProcessor, String> BEAN = new ExtensionPointLazy<BeanProcessor, String>(BeanProcessor.class, SpiLoader.INSTANCE, null, null);
    /**
     * 方法处理器扩展点
     */
    ExtensionPoint<MethodProcessor, String> METHOD = new ExtensionPointLazy<MethodProcessor, String>(MethodProcessor.class, SpiLoader.INSTANCE, null, null);
}



public class SpiLoader implements ExtensionLoader {
    public static final ExtensionLoader INSTANCE = new SpiLoader();

    public SpiLoader() {
    }

    public <T> Collection<Plugin<T>> load(Class<T> clazz) {
        if (clazz == null) {
            return null;
        } else {
            List<Plugin<T>> result = new LinkedList();
            ServiceLoader<T> plugins = ServiceLoader.load(clazz);
            Iterator var4 = plugins.iterator();

            while(var4.hasNext()) {
                //这里的plugin不是上面的那个plugin
                T plugin = var4.next();
                result.add(new Plugin(new Name(plugin.getClass()), plugin, this));
            }

            return result;
        }
    }
}

通过SPI机制将接口的实现类全部加载并实例化一遍,前提是实现类名称放在"META-INF/services/接口名"。当在ConfigPostProcessor中执行METHOD.extensions()时会将实现了MethodProcessor接口的实例取出来

三、SpingBoot注解(@EnableLafConfig)流程总结

DUCC通过实现ImportBeanDefinitionRegistrar接口,将指定的类注册到Spring Boot容器中,另外必须定义一个Java配置类(带有注解@Configuration)通过@Import指定ImportBeanDefinitionRegistrar的实现类。在registerBeanDefinitions()完成了以下几个类的BeanDefinition的注册:

  1. 1、配置管理器ConfiguratorManager,监听器容器Observer
  2. 2、Bean实例化后置处理器ConfigPostProcessor
  3. 3、Bean工厂后置处理器PropertySourcesFactoryPostProcessor

四、Spring初始化流程

主要流程在AbstractApplicationContext.refresh()中

流程图备注

  1. 1、InvokeBeanFactoryPostProcessors() 在Bean未开始实例化时,执行工厂后置处理器,会查找所有BeanFactoryPostProcessor实现类Bean,并且调用方法PostProcessBeanDefinitionRegistry,修改Definition的定义 注意:DUCC中PropertySourcesFactoryPostProcessor实现了BeanFactoryPostProcessor把ConfiguratorManager赋值给Observer
  2. 2、RegisterBeanPostProcessors() 将BeanPostProcessors的实现类添加到工厂的RegisterBeanPostProcessors中

五、Bean实例生命周期

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-04-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 松华说 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、使用方法
  • 二、DUCC各重要模块解读
    • 1、ConfiguratorManager统一配置管理类
      • 2、监听器容器Observer
        • 3、Bean实例化后置处理器ConfigPostProcessor
        • 4、Bean工厂后置处理器PropertySourcesFactorPostProcess
          • 5、资源配置器
            • 6、SPI服务发现
            • 三、SpingBoot注解(@EnableLafConfig)流程总结
            • 四、Spring初始化流程
            • 五、Bean实例生命周期
            相关产品与服务
            容器服务
            腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档