前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊canal的ApplicationConfigMonitor

聊聊canal的ApplicationConfigMonitor

作者头像
code4it
发布2020-04-14 11:55:20
5920
发布2020-04-14 11:55:20
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下canal的ApplicationConfigMonitor

ApplicationConfigMonitor

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationConfigMonitor.java

代码语言:javascript
复制
@Component
public class ApplicationConfigMonitor {

    private static final Logger   logger = LoggerFactory.getLogger(ApplicationConfigMonitor.class);

    @Resource
    private ContextRefresher      contextRefresher;

    @Resource
    private CanalAdapterService   canalAdapterService;

    private FileAlterationMonitor fileMonitor;

    @PostConstruct
    public void init() {
        File confDir = Util.getConfDirPath();
        try {
            FileAlterationObserver observer = new FileAlterationObserver(confDir,
                FileFilterUtils.and(FileFilterUtils.fileFileFilter(),
                    FileFilterUtils.prefixFileFilter("application"),
                    FileFilterUtils.suffixFileFilter("yml")));
            FileListener listener = new FileListener();
            observer.addListener(listener);
            fileMonitor = new FileAlterationMonitor(3000, observer);
            fileMonitor.start();

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    @PreDestroy
    public void destroy() {
        try {
            fileMonitor.stop();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    private class FileListener extends FileAlterationListenerAdaptor {

        @Override
        public void onFileChange(File file) {
            super.onFileChange(file);
            try {
                // 检查yml格式
                new Yaml().loadAs(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8), Map.class);

                canalAdapterService.destroy();

                // refresh context
                contextRefresher.refresh();

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    // ignore
                }
                canalAdapterService.init();
                logger.info("## adapter application config reloaded.");
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
}
  • ApplicationConfigMonitor在PostConstruct的时候创建FileAlterationObserver,添加FileListener,然后使用FileAlterationObserver创建FileAlterationMonitor,然后执行其start方法;在PreDestroy的时候执行fileMonitor.stop();FileListener继承了FileAlterationListenerAdaptor,其onFileChange方法会检查yml格式,执行canalAdapterService.destroy()、contextRefresher.refresh()、canalAdapterService.init()

FileAlterationMonitor

commons-io-2.4-sources.jar!/org/apache/commons/io/monitor/FileAlterationMonitor.java

代码语言:javascript
复制
public final class FileAlterationMonitor implements Runnable {

    private final long interval;
    private final List<FileAlterationObserver> observers = new CopyOnWriteArrayList<FileAlterationObserver>();
    private Thread thread = null;
    private ThreadFactory threadFactory;
    private volatile boolean running = false;

    public FileAlterationMonitor() {
        this(10000);
    }

    public FileAlterationMonitor(long interval) {
        this.interval = interval;
    }

    public FileAlterationMonitor(long interval, FileAlterationObserver... observers) {
        this(interval);
        if (observers != null) {
            for (FileAlterationObserver observer : observers) {
                addObserver(observer);
            }
        }
    }

    //......

    public synchronized void start() throws Exception {
        if (running) {
            throw new IllegalStateException("Monitor is already running");
        }
        for (FileAlterationObserver observer : observers) {
            observer.initialize();
        }
        running = true;
        if (threadFactory != null) {
            thread = threadFactory.newThread(this);
        } else {
            thread = new Thread(this);
        }
        thread.start();
    }

    /**
     * Stop monitoring.
     *
     * @throws Exception if an error occurs initializing the observer
     */
    public synchronized void stop() throws Exception {
        stop(interval);
    }

    /**
     * Stop monitoring.
     *
     * @param stopInterval the amount of time in milliseconds to wait for the thread to finish.
     * A value of zero will wait until the thread is finished (see {@link Thread#join(long)}).
     * @throws Exception if an error occurs initializing the observer
     * @since 2.1
     */
    public synchronized void stop(long stopInterval) throws Exception {
        if (running == false) {
            throw new IllegalStateException("Monitor is not running");
        }
        running = false;
        try {
            thread.join(stopInterval);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        for (FileAlterationObserver observer : observers) {
            observer.destroy();
        }
    }

    public void run() {
        while (running) {
            for (FileAlterationObserver observer : observers) {
                observer.checkAndNotify();
            }
            if (!running) {
                break;
            }
            try {
                Thread.sleep(interval);
            } catch (final InterruptedException ignored) {
            }
        }
    }    
}
  • FileAlterationMonitor的start方法会使用自己的runnable创建Thread,然后执行thread.start();其stop方法则执行thread.join(stopInterval),然后遍历observers,执行observer.destroy();其run方法则遍历observers,执行observer.checkAndNotify()

FileAlterationObserver

commons-io-2.4-sources.jar!/org/apache/commons/io/monitor/FileAlterationObserver.java

代码语言:javascript
复制
public class FileAlterationObserver implements Serializable {

    private final List<FileAlterationListener> listeners = new CopyOnWriteArrayList<FileAlterationListener>();
    private final FileEntry rootEntry;
    private final FileFilter fileFilter;
    private final Comparator<File> comparator;

    //......

    public void checkAndNotify() {

        /* fire onStart() */
        for (FileAlterationListener listener : listeners) {
            listener.onStart(this);
        }

        /* fire directory/file events */
        File rootFile = rootEntry.getFile();
        if (rootFile.exists()) {
            checkAndNotify(rootEntry, rootEntry.getChildren(), listFiles(rootFile));
        } else if (rootEntry.isExists()) {
            checkAndNotify(rootEntry, rootEntry.getChildren(), FileUtils.EMPTY_FILE_ARRAY);
        } else {
            // Didn't exist and still doesn't
        }

        /* fire onStop() */
        for (FileAlterationListener listener : listeners) {
            listener.onStop(this);
        }
    }

    private void checkAndNotify(FileEntry parent, FileEntry[] previous, File[] files) {
        int c = 0;
        FileEntry[] current = files.length > 0 ? new FileEntry[files.length] : FileEntry.EMPTY_ENTRIES;
        for (FileEntry entry : previous) {
            while (c < files.length && comparator.compare(entry.getFile(), files[c]) > 0) {
                current[c] = createFileEntry(parent, files[c]);
                doCreate(current[c]);
                c++;
            }
            if (c < files.length && comparator.compare(entry.getFile(), files[c]) == 0) {
                doMatch(entry, files[c]);
                checkAndNotify(entry, entry.getChildren(), listFiles(files[c]));
                current[c] = entry;
                c++;
            } else {
                checkAndNotify(entry, entry.getChildren(), FileUtils.EMPTY_FILE_ARRAY);
                doDelete(entry);
            }
        }
        for (; c < files.length; c++) {
            current[c] = createFileEntry(parent, files[c]);
            doCreate(current[c]);
        }
        parent.setChildren(current);
    }

    //......
}
  • FileAlterationObserver的checkAndNotify方法会遍历之前的FileEntry,然后使用NameFileComparator递归遍历对比文件变化,分别触发doCreate、doMatch、doDelete,他们会回调FileAlterationListener的对应方法

小结

ApplicationConfigMonitor在PostConstruct的时候创建FileAlterationObserver,添加FileListener,然后使用FileAlterationObserver创建FileAlterationMonitor,然后执行其start方法;在PreDestroy的时候执行fileMonitor.stop();FileListener继承了FileAlterationListenerAdaptor,其onFileChange方法会检查yml格式,执行canalAdapterService.destroy()、contextRefresher.refresh()、canalAdapterService.init()

doc

  • ApplicationConfigMonitor
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ApplicationConfigMonitor
  • FileAlterationMonitor
  • FileAlterationObserver
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档