聊聊nacos的notifyConfigInfo

本文主要研究一下nacos的notifyConfigInfo

CommunicationController

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java

@Controller
@RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH)
public class CommunicationController {
​
    private final DumpService dumpService;
​
    private final LongPollingService longPollingService;
​
    private String trueStr = "true";
​
    @Autowired
    public CommunicationController(DumpService dumpService, LongPollingService longPollingService) {
        this.dumpService = dumpService;
        this.longPollingService = longPollingService;
    }
​
    /**
     * 通知配置信息改变
     */
    @RequestMapping(value = "/dataChange", method = RequestMethod.GET)
    @ResponseBody
    public Boolean notifyConfigInfo(HttpServletRequest request, HttpServletResponse response,
                                    @RequestParam("dataId") String dataId, @RequestParam("group") String group,
                                    @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
                                        String tenant,
                                    @RequestParam(value = "tag", required = false) String tag) {
        dataId = dataId.trim();
        group = group.trim();
        String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
        long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
        String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
        String isBetaStr = request.getHeader("isBeta");
        if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
            dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
        } else {
            dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
        }
        return true;
    }
​
    //......
}
  • notifyConfigInfo方法主要是执行dumpService.dump方法,只是是否beta调用的dump方法不同

DumpService

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java

@Service
public class DumpService {
​
    @Autowired
    private Environment env;
​
    @Autowired
    PersistService persistService;
​
    @PostConstruct
    public void init() {
        LogUtil.defaultLog.warn("DumpService start");
        DumpProcessor processor = new DumpProcessor(this);
        DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
        DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
        DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);
​
        dumpTaskMgr = new TaskManager(
            "com.alibaba.nacos.server.DumpTaskManager");
        dumpTaskMgr.setDefaultTaskProcessor(processor);
​
        dumpAllTaskMgr = new TaskManager(
            "com.alibaba.nacos.server.DumpAllTaskManager");
        dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
​
        //......
    }    
​
    /**
     * 全量dump间隔
     */
    static final int DUMP_ALL_INTERVAL_IN_MINUTE = 6 * 60;
    /**
     * 全量dump间隔
     */
    static final int INITIAL_DELAY_IN_MINUTE = 6 * 60;
​
    private TaskManager dumpTaskMgr;
    private TaskManager dumpAllTaskMgr;
​
    private static final Logger log = LoggerFactory.getLogger(DumpService.class);
​
    static final AtomicInteger FINISHED = new AtomicInteger();
​
    static final int INIT_THREAD_COUNT = 10;
    int total = 0;
    private final static String TRUE_STR = "true";
    private final static String BETA_TABLE_NAME = "config_info_beta";
    private final static String TAG_TABLE_NAME = "config_info_tag";
​
    Boolean isQuickStart = false;
​
    private int retentionDays = 30;
​
​
    //......
​
    public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
    }
​
    public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) {
        dump(dataId, group, tenant, tag, lastModified, handleIp, false);
    }
​
    public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
                     boolean isBeta) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
    }
​
    //......
​
}
  • dump方法最后是往dumpTaskMgr添加DumpTask;dumpTaskMgr的defaultTaskProcessor为dumpProcessor

TaskManager

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java

public final class TaskManager implements TaskManagerMBean {
​
    private static final Logger log = LogUtil.defaultLog;
​
    private final ConcurrentHashMap<String, AbstractTask> tasks = new ConcurrentHashMap<String, AbstractTask>();
​
    private final ConcurrentHashMap<String, TaskProcessor> taskProcessors =
        new ConcurrentHashMap<String, TaskProcessor>();
​
    private TaskProcessor defaultTaskProcessor;
​
    Thread processingThread;
​
    private final AtomicBoolean closed = new AtomicBoolean(true);
​
    private String name;
​
    class ProcessRunnable implements Runnable {
​
        @Override
        public void run() {
            while (!TaskManager.this.closed.get()) {
                try {
                    Thread.sleep(100);
                    TaskManager.this.process();
                } catch (Throwable e) {
                }
            }
​
        }
​
    }
​
    ReentrantLock lock = new ReentrantLock();
​
    Condition notEmpty = this.lock.newCondition();
​
    public TaskManager() {
        this(null);
    }
​
    public AbstractTask getTask(String type) {
        return this.tasks.get(type);
    }
​
    public TaskProcessor getTaskProcessor(String type) {
        return this.taskProcessors.get(type);
    }
​
    @SuppressWarnings("PMD.AvoidManuallyCreateThreadRule")
    public TaskManager(String name) {
        this.name = name;
        if (null != name && name.length() > 0) {
            this.processingThread = new Thread(new ProcessRunnable(), name);
        } else {
            this.processingThread = new Thread(new ProcessRunnable());
        }
        this.processingThread.setDaemon(true);
        this.closed.set(false);
        this.processingThread.start();
    }
​
    //......
​
    /**
     * 将任务加入到任务Map中
     *
     * @param type
     * @param task
     */
    public void addTask(String type, AbstractTask task) {
        this.lock.lock();
        try {
            AbstractTask oldTask = tasks.put(type, task);
            MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
            if (null != oldTask) {
                task.merge(oldTask);
            }
        } finally {
            this.lock.unlock();
        }
    }
​
    protected void process() {
        for (Map.Entry<String, AbstractTask> entry : this.tasks.entrySet()) {
            AbstractTask task = null;
            this.lock.lock();
            try {
                // 获取任务
                task = entry.getValue();
                if (null != task) {
                    if (!task.shouldProcess()) {
                        // 任务当前不需要被执行,直接跳过
                        continue;
                    }
                    // 先将任务从任务Map中删除
                    this.tasks.remove(entry.getKey());
                    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
                }
            } finally {
                this.lock.unlock();
            }
​
            if (null != task) {
                // 获取任务处理器
                TaskProcessor processor = this.taskProcessors.get(entry.getKey());
                if (null == processor) {
                    // 如果没有根据任务类型设置的处理器,使用默认处理器
                    processor = this.getDefaultTaskProcessor();
                }
                if (null != processor) {
                    boolean result = false;
                    try {
                        // 处理任务
                        result = processor.process(entry.getKey(), task);
                    } catch (Throwable t) {
                        log.error("task_fail", "处理task失败", t);
                    }
                    if (!result) {
                        // 任务处理失败,设置最后处理时间
                        task.setLastProcessTime(System.currentTimeMillis());
​
                        // 将任务重新加入到任务Map中
                        this.addTask(entry.getKey(), task);
                    }
                }
            }
        }
​
        if (tasks.isEmpty()) {
            this.lock.lock();
            try {
                this.notEmpty.signalAll();
            } finally {
                this.lock.unlock();
            }
        }
    }
​
    //......
}
  • TaskManager的addTask方法往tasks添加AbstractTask;其构造器启动了ProcessRunnable,其run方法主要是执行TaskManager.this.process()方法;该方法会遍历tasks,取出任务,然后通过TaskProcessor的process方法来执行任务

DumpProcessor

nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpTask.java

class DumpProcessor implements TaskProcessor {
​
    DumpProcessor(DumpService dumpService) {
        this.dumpService = dumpService;
    }
​
    @Override
    public boolean process(String taskType, AbstractTask task) {
        DumpTask dumpTask = (DumpTask)task;
        String[] pair = GroupKey2.parseKey(dumpTask.groupKey);
        String dataId = pair[0];
        String group = pair[1];
        String tenant = pair[2];
        long lastModified = dumpTask.lastModified;
        String handleIp = dumpTask.handleIp;
        boolean isBeta = dumpTask.isBeta;
        String tag = dumpTask.tag;
        if (isBeta) {
            // beta发布,则dump数据,更新beta缓存
            ConfigInfo4Beta cf = dumpService.persistService.findConfigInfo4Beta(dataId, group, tenant);
            boolean result;
            if (null != cf) {
                result = ConfigService.dumpBeta(dataId, group, tenant, cf.getContent(), lastModified, cf.getBetaIps());
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                        ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                        cf.getContent().length());
                }
            } else {
                result = ConfigService.removeBeta(dataId, group, tenant);
                if (result) {
                    ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                        ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                }
            }
            return result;
        } else {
            if (StringUtils.isBlank(tag)) {
                ConfigInfo cf = dumpService.persistService.findConfigInfo(dataId, group, tenant);
                if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
                    if (null != cf) {
                        AggrWhitelist.load(cf.getContent());
                    } else {
                        AggrWhitelist.load(null);
                    }
                }
​
                if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
                    if (null != cf) {
                        ClientIpWhiteList.load(cf.getContent());
                    } else {
                        ClientIpWhiteList.load(null);
                    }
                }
​
                if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
                    if (null != cf) {
                        SwitchService.load(cf.getContent());
                    } else {
                        SwitchService.load(null);
                    }
                }
​
                boolean result;
                if (null != cf) {
                    result = ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified);
​
                    if (result) {
                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                            cf.getContent().length());
                    }
                } else {
                    result = ConfigService.remove(dataId, group, tenant);
​
                    if (result) {
                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                    }
                }
                return result;
            } else {
                ConfigInfo4Tag cf = dumpService.persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
                //
                boolean result;
                if (null != cf) {
                    result = ConfigService.dumpTag(dataId, group, tenant, tag, cf.getContent(), lastModified);
                    if (result) {
                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                            ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
                            cf.getContent().length());
                    }
                } else {
                    result = ConfigService.removeTag(dataId, group, tenant, tag);
                    if (result) {
                        ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp,
                            ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
                    }
                }
                return result;
            }
        }
​
    }
​
    final DumpService dumpService;
}
  • DumpProcessor实现了TaskProcessor接口,其process方法主要是根据不同条件执行ConfigService.dump或者remove方法

小结

notifyConfigInfo方法主要是执行dumpService.dump方法,只是是否beta调用的dump方法不同

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏逆向与安全

基于设备指纹零感验证系统

作者: 我是小三 博客: http://www.cnblogs.com/2014asm/ 由于时间和水平有限,本文会存在诸多不足,希望得到您的及时反馈与指正,多...

18020
来自专栏MongoDB中文社区

使用JMeter做MongoDB性能测试

对大多数应用环境来说,数据库是一个关键要素。如何存储数据以及在哪里存储数据,对整个系统的性能会产生巨大影响。因此,在做开发之前,数据库的选择肯定是最重要的决定之...

12920
来自专栏Rust语言学习交流

一起学Rust-实战leetcode(三)

之后,你的输出需要从左往右逐行读取,产生出一个新的字符串,比如:"LCIRETOESIIGEDHN"。

10240
来自专栏A周立SpringCloud

让人头大的各种锁,从这里让你思绪清晰

说到了锁我们经常会联想到生活中的锁,在我们日常中我们经常会接触到锁。比如我们的手机锁,电脑锁,再比如我们生活中的门锁,这些都是锁。

11820
来自专栏卓文见识

Weblogic反序列化历史漏洞全汇总

序列化是让Java对象脱离Java运行环境的一种手段,可以有效的实现多平台之间的通信、对象持久化存储。

51320
来自专栏木东居士的专栏

憋瞎说,大数据不是你想的那样!

学生党以及很多没设计过大数据开发的小伙伴呢,都对大数据这么一个领域感到非常非常的好奇非常非常的神秘,我今天就非要戳穿给你们看。

9720
来自专栏java达人

透过源码学习设计模式7-适配器模式与HandlerApapter

适配器模式把一个类的接口,变换成客户端所期待的另一种接口,使原本因接口不匹配的两个类能够在一起工作。

8530
来自专栏故久

itext根据模板生成pdf(支持分页)

// 利用模板生成pdf public static void pdfout(Map<String,Object> o,String newPDFPa...

60320
来自专栏Java架构师历程

利用nohup后台运行jar文件包程序

java -jar XXX.jar 特点:当前ssh窗口被锁定,可按CTRL + C打断程序运行,或直接关闭窗口,程序退出

8530
来自专栏.Net、.Net Core 、Docker

通俗易懂设计模式解析——享元模式

  今天我们继续讲述设计模式,今天提及的是享元模式,享——共享。之前不是出现了一系列共享的东西吗?为啥呀,还不就是有些东西每个人都需要,但是每个人都去买一个又...

8230

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励