前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >是时候抛弃 ConfigServer 了,试试 Nacos 统一配置中心动态刷新机制真香

是时候抛弃 ConfigServer 了,试试 Nacos 统一配置中心动态刷新机制真香

作者头像
猿芯
发布于 2021-05-27 10:08:01
发布于 2021-05-27 10:08:01
2.7K00
代码可运行
举报
运行总次数:0
代码可运行

原文 《nacos统一配置中心源码解析》| https://u.nu/mm1i7

配置文件想必大家都很熟悉,无论什么架构 都离不开配置,虽然spring boot已经大大简化了配置,但如果服务很多 环境也好几个,管理配置起来还是很麻烦,并且每次改完配置都需要重启服务,nacos config出现就解决了这些问题,它把配置统一放到服务进行管理,客户端这边进行有需要的获取,可以实时对配置进行修改和发布

如何使用nacos config

首先需要引入nacos config jar包

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    <version>2.2.1.RELEASE</version>
</dependency>

在nacos控制台提前配置需要的配置文件

配置文件格式支持text、json、xml、yaml、html、properties,注意spring boot启动支持的配置文件格式只能为yaml或properties格式,其它格式的配置文件需要后续我们自己写代码去获取

我们来看db.properties也是就数据库配置

data id就是对应配置文件id,group为分组,配置内容就是properties格式的

再来看bootstrap.properties如何引用这个配置文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spring.application.name=nacos-config
server.port=20200

#命名空间
spring.cloud.nacos.config.namespace=${nacos_register_namingspace:0ca74337-8f42-49c3-aec9-32f268a937c4}
#组名
spring.cloud.nacos.config.group=${spring.application.name}
#文件格式
spring.cloud.nacos.config.file-extension=properties
#nacos server地址
spring.cloud.nacos.config.server-addr=localhost:8848

#加载配置文件
spring.cloud.nacos.config.ext-config[0].data-id=nacos.properties
spring.cloud.nacos.config.ext-config[1].data-id=db.properties
spring.cloud.nacos.config.ext-config[2].data-id=mybatis-plus.properties

注意 加载配置文件的分组名默认为DEFAULT_GROUP,如需指定分组 需要再指定

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spring.cloud.nacos.config.ext-config[0].data-id=nacos.properties
spring.cloud.nacos.config.ext-config[0].group=${spring.cloud.nacos.config.group}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
或者
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spring.cloud.nacos.config.ext-config[1].data-id=undertow.properties
spring.cloud.nacos.config.ext-config[1].group=MY_DEFAULT

在这里解释下namespace和group的概念,namespace可以用来解决不同环境的问题,group是来管理配置分组的,它们的关系如下图

spring boot启动容器如何加载nacos config配置文件

这个配置作用是spring在启动之间准备上下文时会启用这个配置 来导入nacos相关配置文件,为后续容器启动做准备

来看NacosConfigBootstrapConfiguration这个配置类

NacosConfigProperties:对应我们上面在bootstrap.properties中对应的配置信息

NacosConfigManager: 持有NacosConfigProperties和ConfigService,ConfigService用来查询 发布配置的相关接口

NacosPropertySourceLocator:它实现了PropertySourceLocator ,spring boot启动时调用PropertySourceLocator.locate(env)用来加载配置信息,下面来看相关源码

代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/******************************************NacosPropertySourceLocator******************************************/
public PropertySource<?> locate(Environment env) {
    ConfigService configService = this.nacosConfigProperties.configServiceInstance();
    if (null == configService) {
        log.warn("no instance of config service found, can't load config from nacos");
        return null;
    } else {
        long timeout = (long)this.nacosConfigProperties.getTimeout();
        this.nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);
        String name = this.nacosConfigProperties.getName();
        String dataIdPrefix = this.nacosConfigProperties.getPrefix();
        if (StringUtils.isEmpty(dataIdPrefix)) {
            dataIdPrefix = name;
        }

        if (StringUtils.isEmpty(dataIdPrefix)) {
            dataIdPrefix = env.getProperty("spring.application.name");
        }

        CompositePropertySource composite = new CompositePropertySource("NACOS");
        // 加载共享的配置文件 不同指定分组 默认DEFAULT_GROUP,对应配置spring.cloud.nacos.config.sharedDataids=shared_1.properties,shared_2.properties
        this.loadSharedConfiguration(composite);
        // 对应spring.cloud.nacos.config.ext-config[0].data-id=nacos.properties的配置
        this.loadExtConfiguration(composite);
        // 加载当前应用配置
        this.loadApplicationConfiguration(composite, dataIdPrefix, this.nacosConfigProperties, env);
        return composite;
    }
}
代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 看一个加载实现即可 流程都差不多 具体实现在NacosPropertySourceBuilder.loadNacosData()方法完成
/******************************************具体实现在NacosPropertySourceBuilder******************************************/
private Properties loadNacosData(String dataId, String group, String fileExtension) {
        String data = null;

        try {
            // 向nacos server拉取配置文件
            data = this.configService.getConfig(dataId, group, this.timeout);
            if (!StringUtils.isEmpty(data)) {
                log.info(String.format("Loading nacos data, dataId: '%s', group: '%s'", dataId, group));
                // spring boot配置当然只支持properties和yaml文件格式
                if (fileExtension.equalsIgnoreCase("properties")) {
                    Properties properties = new Properties();
                    properties.load(new StringReader(data));
                    return properties;
                }

                if (fileExtension.equalsIgnoreCase("yaml") || fileExtension.equalsIgnoreCase("yml")) {
                    YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();
                    yamlFactory.setResources(new Resource[]{new ByteArrayResource(data.getBytes())});
                    return yamlFactory.getObject();
                }
            }
        } catch (NacosException var6) {
            log.error("get data from Nacos error,dataId:{}, ", dataId, var6);
        } catch (Exception var7) {
            log.error("parse data from Nacos error,dataId:{},data:{},", new Object[]{dataId, data, var7});
        }

        return EMPTY_PROPERTIES;
    }
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
至此我们在nacos上配置的properties和yaml文件都载入到spring配置文件中来了,后面可通过context.Environment.getProperty(propertyName)来获取相关配置信息

配置如何随spring boot加载进来我们说完了,接来下来看修改完配置后如何实时刷新

nacos config动态刷新

当nacos config更新后,根据配置中的refresh属性来判断是否刷新配置,配置如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spring.cloud.nacos.config.ext-config[0].refresh=true

首先sprin.factories 配置了EnableAutoConfiguration=NacosConfigAutoConfiguration,NacosConfigAutoConfiguration配置类会注入一个NacosContextRefresher,它首先监听了ApplicationReadyEvent,然后注册一个nacos listener用来监听nacos config配置修改后发布一个spring refreshEvent用来刷新配置和应用

代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware

public void onApplicationEvent(ApplicationReadyEvent event) {
    // 只注册一次
    if (this.ready.compareAndSet(false, true)) {
        this.registerNacosListenersForApplications();
    }
}
    
private void registerNacosListenersForApplications() {
    if (this.refreshProperties.isEnabled()) {
        Iterator var1 = NacosPropertySourceRepository.getAll().iterator();
        while(var1.hasNext()) {
            NacosPropertySource nacosPropertySource = (NacosPropertySource)var1.next();
            // 对应刚才所说的配置 需要配置文件是否需要刷新
            if (nacosPropertySource.isRefreshable()) {
                String dataId = nacosPropertySource.getDataId();
                // 注册nacos监听器
                this.registerNacosListener(nacosPropertySource.getGroup(), dataId);
            }
        }
    }
}
    
private void registerNacosListener(final String group, final String dataId) {
    Listener listener = (Listener)this.listenerMap.computeIfAbsent(dataId, (i) -> {
        return new Listener() {
            public void receiveConfigInfo(String configInfo) {
                NacosContextRefresher.refreshCountIncrement();
                String md5 = "";
                if (!StringUtils.isEmpty(configInfo)) {
                    try {
                        MessageDigest md = MessageDigest.getInstance("MD5");
                        md5 = (new BigInteger(1, md.digest(configInfo.getBytes("UTF-8")))).toString(16);
                    } catch (UnsupportedEncodingException | NoSuchAlgorithmException var4) {
                        NacosContextRefresher.log.warn("[Nacos] unable to get md5 for dataId: " + dataId, var4);
                    }
                }
                // 添加刷新记录
                NacosContextRefresher.this.refreshHistory.add(dataId, md5);
                // 发布一个spring refreshEvent事件 对应监听器为RefreshEventListener 该监听器会完成配置的更新应用
                NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));
                if (NacosContextRefresher.log.isDebugEnabled()) {
                    NacosContextRefresher.log.debug("Refresh Nacos config group " + group + ",dataId" + dataId);
                }

            }
            public Executor getExecutor() {
                return null;
            }
        };
    });

    try {
        this.configService.addListener(dataId, group, listener);
    } catch (NacosException var5) {
        var5.printStackTrace();
    }

}

我们说完了nacos config动态刷新,那么肯定有对应的动态监听,nacos config会监听nacos server上配置的更新状态

nacos config动态监听

一般来说客户端和服务端数据交互无非就两种方式

pull:客户端主动从服务器拉取数据

push: 由服务端主动向客户端推送数据

这两种模式优缺点各不一样,pull模式需要考虑的是什么时候向服务端拉取数据 可能会存在数据延迟问题,而push模式需要客户端和服务端维护一个长连接 如果客户端较多会给服务端造成压力 但它的实时性会更好

nacos采用的是pull模式,但它作了优化 可以看做是pull+push,客户端会轮询向服务端发出一个长连接请求,这个长连接最多30s就会超时,服务端收到客户端的请求会先判断当前是否有配置更新,有则立即返回

如果没有服务端会将这个请求拿住“hold”29.5s加入队列,最后0.5s再检测配置文件无论有没有更新都进行正常返回,但等待的29.5s期间有配置更新可以提前结束并返回,下面会在源码中讲解具体怎么处理的

nacos client处理

动态监听的发起是在ConfigService的实现类NacosConfigService的构造方法中,它是对外nacos config api接口,在之前加载配置文件和NacosContextRefresher构造方法中都会获取或创建

这里都会先判断是否已经创建了ConfigServer,没有则实例化一个NacosConfigService,来看它的构造函数

代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/***************************************** NacosConfigService *****************************************/
public NacosConfigService(Properties properties) throws NacosException {
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        encode = Constants.ENCODE;
    } else {
        encode = encodeTmp.trim();
    }
    initNamespace(properties);
    // 用来向nacos server发起请求的代理,这里用到了装饰模式
    agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    agent.start();
    // 客户端的一个工作类,agent作为它的构造传参 可猜想到里面肯定会做一些远程调用
    worker = new ClientWorker(agent, configFilterChainManager, properties);
}

/***************************************** ClientWorker *****************************************/
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;

    // Initialize the timeout parameter

    init(properties);
    // 这个线程池只有一个核心线程 用来执行checkConfigInfo()方法
    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
    // 其它需要执行线程的地方都交给这个线程池来处理
    executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
    
    // 执行一个调用checkConfigInfo()方法的周期性任务,每10ms执行一次,首次执行延迟1ms后执行
    executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

NacosConfigService构造方法主要创建一个agent 它是用来向nacos server发出请求的,然后又创建了一个clientwoker,它的构造方法创建了两个线程池,第一个线程池只有一个核心线程,它会执行一个周期性任务只用来调用checkconfiginfo()方法,第二个线程是后续由需要执行线程的地方都交给它来执行,下面重点来看checkconfiginfo()方法

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void checkConfigInfo() {
    // 分任务
    int listenerSize = cacheMap.get().size();
    // 向上取整为批数
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
    new HashMap<String, CacheData>());

cacheMap:缓存着需要刷新的配置,它是在调用ConfigService 添加监听器方式时会放入,可以自定义监听配置刷新

代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 添加一个config监听器,用来监听dataId为ErrorCode,group为DEFAULT_GROUP的config
configService.addListener("ErrorCode","DEFAULT_GROUP",new Listener() {
    @Override
    public Executor getExecutor() {
        return null;
    }

    @Override
    public void receiveConfigInfo(String s) { //当配置更新时会调用监听器该方法
        Map<String, Map<String, String>> map = JSON.parseObject(s, Map.class);
        // 根据自己的业务需要来处理
    }
});

这里采用了一个策略:将cacheMap中的数量以3000分一个组,分别创建一个LongPollingRunnable用来监听配置更新,这个LongPollingRunnable就是我们之前所说的长连接任务,来看这个长连接任务

代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class LongPollingRunnable implements Runnable {
    private int taskId;

    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }

    @Override
    public void run() {

        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // check failover config
            for (CacheData cacheData : cacheMap.get().values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        // 1、检查本地配置
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) {
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }

            // 2、向nacos server发出一个长连接 30s超时,返回nacos server有更新过的dataIds
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
            LOGGER.info("get changedGroupKeys:" + changedGroupKeys);

            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    // 3、向nacos server请求获取config最新内容
                    String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                    CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(ct[0]);
                    if (null != ct[1]) {
                        cache.setType(ct[1]);
                    }
                } 
            }
            // 4、对有变化的config调用对应监听器去处理
            for (CacheData cacheData : cacheDatas) {
                if (!cacheData.isInitializing() || inInitializingCacheList
                    .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                    cacheData.checkListenerMd5();
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();
            // 继续轮询
            executorService.execute(this);
        } catch (Throwable e) {
            // 发生异常延迟执行
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }
}

这个长轮询主要做了4个步骤

  1. 检查本地配置,如果存在本地配置,并且与缓存中的本地配置版本不一样,把本地配置内容更新到缓存,并触发事件,这块源码比较简单,读者跟到源码一读编制
  2. 向nacos server发出一个长连接,30s超时,nacos server会返回有变化的dataIds
  3. 根据变化的dataId,从服务端拉取最新的配置内容然后更新到缓存中
  4. 对有变化的配置 触发事件监听器来处理

讲完了nacos client处理流程,再来看服务端这边怎么处理这个长连接的

nacos server处理

服务端长连接接口是/config/listener,对应源码包为config

代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/****************************************** ConfigController ******************************************/
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    
    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
    // 需要检查更新的config信息
    Map<String, String> clientMd5Map;
    try {
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    
    // 长连接处理
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

/****************************************** ConfigServletInner ******************************************/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
            Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
    
    // 判断是否支持长轮询
    if (LongPollingService.isSupportLongPolling(request)) {
        // 长轮询处理
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }
    
    // 不支持长轮询,直接与当前配置作比较,返回有变更的配置
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
    
    // Compatible with short polling result.
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);
    
    /*
    * 省略
    * 会响应变更的配置信息
    */
    return HttpServletResponse.SC_OK + "";
}

/****************************************** LongPollingService ******************************************/
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
            int probeRequestSize) {
        
    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    String tag = req.getHeader("Vipserver-Tag");
    
    // 服务端这边最多处理时长29.5s,需要留0.5s来返回,以免客户端那边超时
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
    // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // Do nothing but set fix polling timeout.
    } else {
        // 不支持长轮询 本地对比返回
        long start = System.currentTimeMillis();
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) {
            generateResponse(req, rsp, changedGroups);
            // log....
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
            // log....
            return;
        }
    }
    String ip = RequestUtil.getRemoteIp(req);
    
    // 将http响应交给异步线程,返回一个异步响应上下文, 当配置更新后可以主动调用及时返回,不用非等待29.5s
    final AsyncContext asyncContext = req.startAsync();
    
    // AsyncContext.setTimeout() is incorrect, Control by oneself
    asyncContext.setTimeout(0L);
    // 执行客户端长连接任务,
    ConfigExecutor.executeLongPolling(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

/****************************************** ClientLongPolling ******************************************/
class ClientLongPolling implements Runnable {
        
    @Override
    public void run() {
        // 提交一个任务,延迟29.5s执行
        asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
            @Override
            public void run() {
                try {
                    getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                    
                    // Delete subsciber's relations.
                    allSubs.remove(ClientLongPolling.this);
                    
                    if (isFixedPolling()) {
                        // 检查变更配置 并相应
                        List<String> changedGroups = MD5Util
                                .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                        (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                        if (changedGroups.size() > 0) {
                            sendResponse(changedGroups);
                        } else {
                            sendResponse(null);
                        }
                    } else {
                        sendResponse(null);
                    }
                } catch (Throwable t) {
                    LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                }
                
            }
            
        }, timeoutTime, TimeUnit.MILLISECONDS);
        
        allSubs.add(this);
    }
}


final Queue<ClientLongPolling> allSubs

上面大部分地方都比较好懂,主要解释下ClientLongPolling作用,它首先会提交一个任务,无论配置有没有更新 最终都会进行响应,延迟29.5s执行,然后会把自己添加到一个队列中,之前说过,服务端这边配置有更新后 会找出正在等待配置更新的长连接任务,提前结束这个任务并返回,

来看这一步是怎么处理的

代码语言:javascript
代码运行次数:0
运行
复制
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public LongPollingService() {
    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
    
    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
    
    // Register LocalDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
    
    // Register A Subscriber to subscribe LocalDataChangeEvent.
    NotifyCenter.registerSubscriber(new Subscriber() {
        
        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // Ignore.
            } else {
                if (event instanceof LocalDataChangeEvent) {
                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                    ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                }
            }
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return LocalDataChangeEvent.class;
        }
    });
    
}

class DataChangeTask implements Runnable {
        
    @Override
    public void run() {
        try {
            ConfigCacheService.getContentBetaMd5(groupKey);
            // 找出等在该配置的长连接,然后进行提前返回
            for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                ClientLongPolling clientSub = iter.next();
                if (clientSub.clientMd5Map.containsKey(groupKey)) {
                    // If published tag is not in the beta list, then it skipped.
                    if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
                        continue;
                    }
                    
                    // If published tag is not in the tag list, then it skipped.
                    if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                        continue;
                    }
                    
                    getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                    iter.remove(); // Delete subscribers' relationships.
                    clientSub.sendResponse(Arrays.asList(groupKey));
                }
            }
        } catch (Throwable t) {
            LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
        }
    }
}

LongPollingService构造函数中,会注册一个订阅,用来监听LocalDataChangeEvent,当发生该事件时,会执行一个数据变更任务,这个任务就是找出等在配置的长连接,提前返回

我们在nacos控制台修改一个配置文件进行发布,会调用ConfigController.publishConfig接口,但这个接口发布的是ConfigDataChangeEvent事件,大意了。。。LocalDataChangeEvent事件发布在ConfigCacheService,这里怎么调用的我就不深追,留给有兴趣的读者

至此nacos config动态监听、刷新就串联起来了,nacos的相关源码都比较好理解,跟着源码追进去就一目了然了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构荟萃 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
双十一云服务采购指南:腾讯云服务器CVM安装与配置
双十一不仅仅是购物狂欢节,对于希望享受高性能云服务的企业和开发者而言,更是入手腾讯云产品的绝佳时机!然而,面对琳琅满目的产品与各种优惠活动,如何才能买到性价比最高的产品?本文将为你深入剖析腾讯云双十一的优惠机制,教你如何用最少的预算获取最佳的云服务配置,助你在年末冲刺中脱颖而出。
一键难忘
2024/11/10
2141
双十一云服务采购指南:腾讯云服务器CVM安装与配置
云服务器
云服务器(Cloud Virtual Machine,CVM)为您提供安全可靠的弹性计算服务。 只需几分钟,您就可以在云端获取和启用 CVM,来实现您的计算需求。随着业务需求的变化,您可以实时扩展或缩减计算资源。 CVM 支持按实际使用的资源计费,可以为您节约计算成本。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
用户6248011
2019/09/27
53.8K0
腾讯云-云服务器概述&售前
云服务器(Cloud Virtual Machine , CVM)提供安全可靠的弹性计算服务,只需要几分钟,可以在云端获取和启动CVM实现你的计算需求,随着业务的需求变化, 你可以实时扩展或者缩减计算资源,CVM 支持按实际使用的资源计费,可以节约计算成本,使用CVM 可以极大降低软硬件的采购成本,简化IT运维工作。
Chris Fei
2021/05/03
37.5K0
​一文搞定如何选取最佳CVM云服务器
对于想在云上部署业务的小伙伴来说,尤其是刚接触云服务的朋友,腾讯云云服务器实例规格种类,区域繁多。价格也不尽相同,如何根据自己的业务类型和预算,选择适合自己的腾讯云云服务器。
炒香菇的书呆子
2024/11/18
1800
腾讯云服务器的优势
先为大家带来一点福利。腾讯云最近开始发放代金券了,新客户无门槛领取总价值高达2775元代金券,每种代金券限量500张,先到先得,建议大家都领取一份,反正是免费领的,说不定以后需要呢?
2018/10/22
12.5K0
腾讯云服务器的优势
先为大家带来一点福利。腾讯云最近开始发放代金券了,新客户无门槛领取总价值高达2775元代金券(实际金额以代金券领取页面地址为准(,每种代金券限量500张,先到先得,建议大家都领取一份,反正是免费领的,说不定以后需要呢?
用户5908769
2019/07/26
13.5K0
腾讯云服务器常见问题,用户关心的腾讯云服务器常见问题及答案汇总
本文主要介绍初次接触腾讯云的用户,对于腾讯云服务器的一些问题及答案,帮助用户更好的了解腾讯云服务器,熟悉腾讯云服务器的价格,掌握腾讯云各种活动,更好的使用腾讯云各种云产品。
云小子
2022/06/01
3.3K0
手把手教你购买腾讯云服务器
腾讯云是腾讯公司旗下的云计算服务提供商,提供一系列基础设施和云服务,涵盖了计算、存储、数据库、人工智能、大数据分析、物联网等领域。腾讯云在全球范围内建立了多个数据中心,提供多地域、多可用区的服务支持,为用户提供高可靠性和稳定性的服务。它还提供灵活的付费模式和全面的技术支持,适应了各种规模和类型的用户需求。这里我们将手把手教你如何快速购买腾讯云服务器。
无代码Dev
2024/01/26
1.5K0
手把手教你购买腾讯云服务器
【腾讯云产品最佳实践】腾讯云服务器CVM安装与配置及实际使用
双十一活动入口:https://cloud.tencent.com/act/pro/double11-2024?fromSource=gwzcw.8891746.8891746.8891746
三掌柜
2024/11/18
2050
【腾讯云产品最佳实践】腾讯云服务器CVM安装与配置及实际使用
腾讯云服务器有什么优势?稳定性怎么样?
先为大家带来一点福利。腾讯云最近开始发放代金券了,新客户无门槛领取总价值高达2775元代金券,每种代金券限量500张,先到先得,建议大家都领取一份,反正是免费领的,说不定以后需要呢?
用户5601883
2019/07/16
10.8K0
腾讯云服务器有什么优势?稳定性怎么样?
云主机对比-腾讯云服务器的优势优点有哪些
腾讯云服务器优势优点有哪些?很多朋友在购买云服务器时,会看到腾讯云服务器的品牌,但是对腾讯云的特点缺乏一定的了解,这里我们介绍下腾讯云服务器优势优点有哪些。
tengxunyun8点com活动整理
2019/04/09
41.7K1
云主机对比-腾讯云服务器的优势优点有哪些
哪个云服务最好用,腾讯云的功能与优势
很多用户在初次选择云服务器商家的时候,往往不知道怎么选择哪个云服务商好,因为国内云服务商众多,各有各的特点,但是目前选择腾讯云的用户越来越多了,我们就来说说为什么上云要首选阿里云。
tengxunyun8点com活动整理
2019/04/15
16.7K0
哪个云服务最好用,腾讯云的功能与优势
如何购买腾讯云服务器?『新手小白教程』
腾讯云产品系列有云服务器、云硬盘、云数据库、CDN、云存储等等产品,其中腾讯云服务器,因为用途比较广泛,比如用来运行量化交易系统、跑自动化交易程序、搭建跑外汇MT4/MT5 EA的服务器以及网站建设等等,所以成为众多用户喜爱的选择。 由于很多用户之前没有购买过云服务器,对于有些不熟悉云服务器的新用户可能需要一点指导,所以本文将详细介绍腾讯云服务器的购买过程,并且提供一些优惠购买信息与通道,帮助大家购买适合自己业务需求的高性价比云服务器。
用户2416682
2019/07/24
10.3K0
如何购买腾讯云服务器?『新手小白教程』
腾讯云国际站代理商:为什么选择腾讯云服务器搭建个人博客?
• 高性能服务器:腾讯云服务器提供强大的计算能力,能够满足个人博客从初期到后期访问量增长的各种需求,确保网站加载速度快,用户体验良好。
悟空云_飞机@CloudWuKong
2025/01/21
2280
腾讯云国际站代理商:为什么选择腾讯云服务器搭建个人博客?
什么是云计算?什么是云服务器?有什么用?
很多人都听过云计算和云服务器这几个概念,那么到底什么是云计算、什么是云服务器呢?云服务器又有什么用呢?提供腾讯云特惠1折秒杀活动的“尊托云数-zuntop.com”就带大家一起来了解一下。
用户5175848
2019/07/18
21.6K0
什么是云计算?什么是云服务器?有什么用?
借助腾讯云轻量应用服务器优化双11电商促销活动:成本控制与性能提升的实践
随着云计算技术和互联网应用的快速发展,越来越多的中小型企业和个人开发者需要一个高效、灵活且易于管理的托管平台来支持其业务和应用程序的运行。在这种背景下,轻量应用服务器(Lightweight Application Server,简称LAS)的需求日益增加。它以其简单易用、低运维成本的特点,成为开发者和企业解决应用部署与管理的理想选择。轻量应用服务器的出现,不仅为开发者提供了便捷的开发环境,也为企业带来了更高效的资源利用和成本控制。
小馒头学Python
2024/11/10
1.6K0
借助腾讯云轻量应用服务器优化双11电商促销活动:成本控制与性能提升的实践
腾讯云服务器与普通服务器区别在哪?如何选择?
腾讯云服务器与普通的IDC机房或服务器厂商相比,腾讯云服务器CVM具有高可用性、安全性和弹性优势。小编从以上几个方面详细说下这二者的区别及如何选择。
用户5916283
2019/09/19
7.8K0
腾讯云服务器与普通服务器区别在哪?如何选择?
做网站如何选择云服务器?(二)服务篇
在现代互联网时代,越来越多的企业和个人都选择云服务器来满足业务需求。而随着市场上云服务器品牌的不断增多,各大云服务商纷纷推出各项举措以提升市场竞争力,并满足日益多样化的用户需求。这些举措中包含了哪些服务内容?而哪些方面又应当引起特别的关注呢?今天就来探讨一下这些问题。
云惑雨问
2025/03/10
1790
做网站如何选择云服务器?(二)服务篇
腾讯云 CVM 产品详细信息
腾讯云 CVM 提供了在云中的可扩展的虚拟计算资源,允许您选择多种操作系统来启动 CVM 实例,并加载到您自定义的应用环境。后续随着业务量的变化,您还可以随时调整您的 CVM 规格。
勤劳的小蜜蜂
2019/08/12
3K0
如何挑选一台云服务器
以前,我作为一个个人开发者,倒腾过一些入门级别的云服务器,玩玩技术,不太在意云服务器的性能。
五分钟学算法
2023/01/10
20.6K0
如何挑选一台云服务器
推荐阅读
相关推荐
双十一云服务采购指南:腾讯云服务器CVM安装与配置
更多 >
LV.3
这个人很懒,什么都没有留下~
目录
  • 如何使用nacos config
  • spring boot启动容器如何加载nacos config配置文件
  • nacos config动态刷新
  • nacos config动态监听
    • nacos client处理
    • nacos server处理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档