前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >canal 源码解析系列-CanalInstance模块解析

canal 源码解析系列-CanalInstance模块解析

作者头像
用户7634691
发布2021-09-08 11:32:15
3460
发布2021-09-08 11:32:15
举报

下面涉及到源码的地方,我都经过了处理,删减了一些不重要的代码(比如参数校验),便于理解

正文

上一篇文章

canal 源码解析系列-CanalServerWithEmbedded解读

提到了CanalServerWithEmbedded 内部管理所有的CanalInstance,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理。

本篇就来深入解读下CanalInstance模块。先看幅图,

instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。每个组件后面我们会有单独的文章专门分析。

从这幅图我们可以看出instance是怎么生成的。

CanalInstanceGenerator相当于一个工厂类,通过 destination 产生特定的 CanalInstance,它有两个实现:

  • ManagerCanalInstanceGenerator类,manager方式:和你自己的内部web console/manager系统进行对接。(目前主要是公司内部使用)
  • SpringCanalInstanceGenerator类,spring方式:基于spring xml + properties进行定义,构建spring配置.

具体使用哪个,是通过配置的,如下所示:

canal.properties文件

代码语言:javascript
复制
canal.instance.global.mode = spring

com.alibaba.otter.canal.deployer.CanalController#initGlobalConfig方法,

代码语言:javascript
复制
...
if (config.getMode().isManager()) {
                PlainCanalInstanceGenerator instanceGenerator = new PlainCanalInstanceGenerator(properties);
                instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
                instanceGenerator.setSpringXml(config.getSpringXml());
                return instanceGenerator.generate(destination);
            } else if (config.getMode().isSpring()) {
                SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();
                instanceGenerator.setSpringXml(config.getSpringXml());
                return instanceGenerator.generate(destination);
            } else {
                throw new UnsupportedOperationException("unknow mode :" + config.getMode());
            }
            ...

先来看下spring的版本实现,

代码语言:javascript
复制
public CanalInstance generate(String destination) {
        synchronized (CanalEventParser.class) {
            try {
                // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                System.setProperty("canal.instance.destination", destination);
                //file-instance.xml文件,里面定义了一些bean
                this.beanFactory = getBeanFactory(springXml);
                String beanName = destination;
                if (!beanFactory.containsBean(beanName)) {
                    beanName = defaultName;
                }

            ...
        }
    }

canal提供了几种spring配置文件的模版给我们选择,如下图所示:

  • spring/memory-instance.xml
  • spring/file-instance.xml
  • spring/default-instance.xml
  • spring/group-instance.xml

然后部署的时候,我们可以通过在canal.properties配置文件中指定使用哪个文件:

代码语言:javascript
复制
canal.instance.global.spring.xml = classpath:spring/file-instance.xml

这几个文件的主要区别是,metaManager 和eventParser 这两个配置有所不同,可能在内存、文件或zk进行存储。

spring的配置文件示例如下:

代码语言:javascript
复制
<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
        <property name="destination" value="${canal.instance.destination}" />
        <property name="eventParser">
            <ref bean="eventParser" />
        </property>
        <property name="eventSink">
            <ref bean="eventSink" />
        </property>
        <property name="eventStore">
            <ref bean="eventStore" />
        </property>
        <property name="metaManager">
            <ref bean="metaManager" />
        </property>
        <property name="alarmHandler">
            <ref bean="alarmHandler" />
        </property>
    </bean>

generate方法返回的是CanalInstanceWithSpring这个实现类,它继承自AbstractCanalInstance,并且实现了CanalInstance。这个类的实现只有几十行,之所以这么少是因为大部分的逻辑都已经通过spring的配置文件实现了,如下:

代码语言:javascript
复制
<!-- 报警处理类 -->
    <bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />

    <bean id="metaManager" class="com.alibaba.otter.canal.meta.FileMixedMetaManager">
        <property name="dataDir" value="${canal.file.data.dir:../conf}" />
        <property name="period" value="${canal.file.flush.period:1000}" />
    </bean>

    <bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer">
        <property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" />
        <property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" />
        <property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" />
    </bean>

    <bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink">
        <property name="eventStore" ref="eventStore" />
    </bean>
    ...

ManagerCanalInstanceGenerator实现类似,这里就不多说了。它返回的是CanalInstanceWithManager,同样它继承自AbstractCanalInstance,并且实现了CanalInstance。类图如下:

核心的功能都在AbstractCanalInstance类中,我们来看下。

代码语言:javascript
复制
// 通知下订阅关系变化
    @Override
    public boolean subscribeChange(ClientIdentity identity) {
        if (StringUtils.isNotEmpty(identity.getFilter())) {
            logger.info("subscribe filter change to " + identity.getFilter());
            //订阅关系发生变化触发,主要是更新filter,这个filter决定了binlog订阅的库,表
            AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());

            boolean isGroup = (eventParser instanceof GroupEventParser);
            if (isGroup) {
                // 处理group的模式
                List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
                for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
                    if(singleEventParser instanceof AbstractEventParser) {
                        ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
                    }
                }
            } else {
                if(eventParser instanceof AbstractEventParser) {
                    ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
                }
            }

        }

        // filter的处理规则
        // a. parser处理数据过滤处理
        // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
        // 后续内存版的一对多分发,可以考虑
        return true;
    }

start方法和stop方法没什么可讲的,就是启停instance内部的组件。

beforeStartEventParserafterStartEventParser是eventParser启动的前置和后置操作。前者调用了startEventParserInternal,后者调用了stopEventParserInternal

代码语言:javascript
复制
protected void startEventParserInternal(CanalEventParser eventParser, boolean isGroup) {
        if (eventParser instanceof AbstractEventParser) {
           ...
            // 首先启动log position管理器
            CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
            if (!logPositionManager.isStart()) {
                logPositionManager.start();
            }
        }

            ...
            CanalHAController haController = mysqlEventParser.getHaController();
            ...
                haController.start();

        }
    }
代码语言:javascript
复制
protected void stopEventParserInternal(CanalEventParser eventParser) {
        if (eventParser instanceof AbstractEventParser) {
            ...
            // 首先启动log position管理器
            CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
            if (logPositionManager.isStart()) {
                logPositionManager.stop();
            }
        }

            ....
            CanalHAController haController = mysqlEventParser.getHaController();
            if (haController.isStart()) {
                haController.stop();
            }
    }

就是分别负责了CanalLogPositionManagerCanalHAController的启动停止工作。

CanalLogPositionManager记录binlog最后一次解析成功位置,有不同的实现,可以保存在内存,zk等存在介质里。mysql在主从同步过程中,slave自己需要维护binlog的消费进度信息。而canal伪装成slave,因此也要维护这样的信息。

CanalHAController主要是通过失败检测, 控制 EventParser 的链接主机管理,判断当前该链接哪个mysql数据库。它只有一个实现类HeartBeatHAController

失败转换的逻辑也很简单,定时发送心跳语句到当前链接的数据库,超过一定次数检测失败时,尝试切换到备机

代码语言:javascript
复制
//心跳发送成功
    public void onSuccess(long costTime) {
        failedTimes = 0;
    }

    //心跳发送失败
    public void onFailed(Throwable e) {
        failedTimes++;
        // 检查一下是否超过失败次数
        synchronized (this) {
            if (failedTimes > detectingRetryTimes) {
                if (switchEnable) {
                    eventParser.doSwitch();// 通知执行一次切换
                    failedTimes = 0;
                } else {
                    logger.warn("HeartBeat failed Times:{} , should auto switch ?", failedTimes);
                }
            }
        }
    }

心跳的逻辑在 com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.MysqlDetectingTimeTask 实现,是个定时器。

代码语言:javascript
复制
class MysqlDetectingTimeTask extends TimerTask {

       ...

        public void run() {
            try {
                ...

                // 可能心跳sql为select 1
                if (StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "select")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "show")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "explain")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "desc")) {
                    mysqlConnection.query(detectingSQL);
                } else {
                    mysqlConnection.update(detectingSQL);
                }

                long costTime = System.currentTimeMillis() - startTime;
                if (haController != null && haController instanceof HeartBeatCallback) {
                //成功
                    ((HeartBeatCallback) haController).onSuccess(costTime);
                }
            } catch (Throwable e) {
            //失败
                if (haController != null && haController instanceof HeartBeatCallback) {
                    ((HeartBeatCallback) haController).onFailed(e);
                }
                reconnect = true;
                logger.warn("connect failed by ", e);
            }

        }

       ...
    }

总结

总体来看,CanalInstance模块本身没有什么特别复杂的逻辑,它的核心处理都在parser、sink、store、metamanager等内部组件里。这些内部组件我们后面会有文章单独分析。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-08-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 犀牛的技术笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 正文
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档