前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊canal的MysqlDetectingTimeTask

聊聊canal的MysqlDetectingTimeTask

作者头像
code4it
发布2020-04-22 11:56:35
4960
发布2020-04-22 11:56:35
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下canal的MysqlDetectingTimeTask

MysqlDetectingTimeTask

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

代码语言:javascript
复制
    class MysqlDetectingTimeTask extends TimerTask {

        private boolean         reconnect = false;
        private MysqlConnection mysqlConnection;

        public MysqlDetectingTimeTask(MysqlConnection mysqlConnection){
            this.mysqlConnection = mysqlConnection;
        }

        public void run() {
            try {
                if (reconnect) {
                    reconnect = false;
                    mysqlConnection.reconnect();
                } else if (!mysqlConnection.isConnected()) {
                    mysqlConnection.connect();
                }
                Long startTime = System.currentTimeMillis();

                // 可能心跳sql为select 1
                if (StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "select")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "show")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "explain")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "desc")) {
                    mysqlConnection.query(detectingSQL);
                } else {
                    mysqlConnection.update(detectingSQL);
                }

                Long costTime = System.currentTimeMillis() - startTime;
                if (haController != null && haController instanceof HeartBeatCallback) {
                    ((HeartBeatCallback) haController).onSuccess(costTime);
                }
            } catch (SocketTimeoutException e) {
                if (haController != null && haController instanceof HeartBeatCallback) {
                    ((HeartBeatCallback) haController).onFailed(e);
                }
                reconnect = true;
                logger.warn("connect failed by ", e);
            } catch (IOException e) {
                if (haController != null && haController instanceof HeartBeatCallback) {
                    ((HeartBeatCallback) haController).onFailed(e);
                }
                reconnect = true;
                logger.warn("connect failed by ", e);
            } catch (Throwable e) {
                if (haController != null && haController instanceof HeartBeatCallback) {
                    ((HeartBeatCallback) haController).onFailed(e);
                }
                reconnect = true;
                logger.warn("connect failed by ", e);
            }

        }

        public MysqlConnection getMysqlConnection() {
            return mysqlConnection;
        }
    }
  • MysqlDetectingTimeTask继承了TimerTask,其run方法在reconnect为true时,更新reconnect为false,然后执行mysqlConnection.reconnect();若reconnect为false且mysqlConnection.isConnected()为false则执行mysqlConnection.connect();之后执行detectingSQL语句,然后记录costTime,最后执行((HeartBeatCallback) haController).onSuccess(costTime);依次捕获SocketTimeoutException、IOException、Throwable,然后则执行((HeartBeatCallback) haController).onFailed(e),更新reconnect为true

HeartBeatCallback

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/HeartBeatCallback.java

代码语言:javascript
复制
public interface HeartBeatCallback {

    /**
     * 心跳发送成功
     */
    public void onSuccess(long costTime);

    /**
     * 心跳发送失败
     */
    public void onFailed(Throwable e);

}
  • HeartBeatCallback接口定义了onSuccess、onFailed方法

HeartBeatHAController

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/ha/HeartBeatHAController.java

代码语言:javascript
复制
public class HeartBeatHAController extends AbstractCanalLifeCycle implements CanalHAController, HeartBeatCallback {

    private static final Logger logger              = LoggerFactory.getLogger(HeartBeatHAController.class);
    // default 3 times
    private int                 detectingRetryTimes = 3;
    private int                 failedTimes         = 0;
    private boolean             switchEnable        = false;
    private CanalHASwitchable   eventParser;

    public HeartBeatHAController(){

    }

    public void onSuccess(long costTime) {
        failedTimes = 0;
    }

    public void onFailed(Throwable e) {
        failedTimes++;
        // 检查一下是否超过失败次数
        synchronized (this) {
            if (failedTimes > detectingRetryTimes) {
                if (switchEnable) {
                    eventParser.doSwitch();// 通知执行一次切换
                    failedTimes = 0;
                } else {
                    logger.warn("HeartBeat failed Times:{} , should auto switch ?", failedTimes);
                }
            }
        }
    }

    // ============================= setter / getter
    // ============================

    public void setCanalHASwitchable(CanalHASwitchable canalHASwitchable) {
        this.eventParser = canalHASwitchable;
    }

    public void setDetectingRetryTimes(int detectingRetryTimes) {
        this.detectingRetryTimes = detectingRetryTimes;
    }

    public void setSwitchEnable(boolean switchEnable) {
        this.switchEnable = switchEnable;
    }

}
  • HeartBeatHAController实现了HeartBeatCallback接口,其onSuccess方法更新failedTimes为0;其onFailed方法递增failedTimes,然后判断failedTimes是否大于detectingRetryTimes,且switchEnable为true则执行eventParser.doSwitch(),然后更新failedTimes为0

doSwitch

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

代码语言:javascript
复制
public class MysqlEventParser extends AbstractMysqlEventParser implements CanalEventParser, CanalHASwitchable {

    //......

    // 处理主备切换的逻辑
    public void doSwitch() {
        AuthenticationInfo newRunningInfo = (runningInfo.equals(masterInfo) ? standbyInfo : masterInfo);
        this.doSwitch(newRunningInfo);
    }

    public void doSwitch(AuthenticationInfo newRunningInfo) {
        // 1. 需要停止当前正在复制的过程
        // 2. 找到新的position点
        // 3. 重新建立链接,开始复制数据
        // 切换ip
        String alarmMessage = null;

        if (this.runningInfo.equals(newRunningInfo)) {
            alarmMessage = "same runingInfo switch again : " + runningInfo.getAddress().toString();
            logger.warn(alarmMessage);
            return;
        }

        if (newRunningInfo == null) {
            alarmMessage = "no standby config, just do nothing, will continue try:"
                           + runningInfo.getAddress().toString();
            logger.warn(alarmMessage);
            sendAlarm(destination, alarmMessage);
            return;
        } else {
            stop();
            alarmMessage = "try to ha switch, old:" + runningInfo.getAddress().toString() + ", new:"
                           + newRunningInfo.getAddress().toString();
            logger.warn(alarmMessage);
            sendAlarm(destination, alarmMessage);
            runningInfo = newRunningInfo;
            start();
        }
    }

    //......

}    
  • MysqlEventParser的doSwitch方法先执行stop方法,然后更新runningInfo,最后执行start方法

小结

MysqlDetectingTimeTask继承了TimerTask,其run方法在reconnect为true时,更新reconnect为false,然后执行mysqlConnection.reconnect();若reconnect为false且mysqlConnection.isConnected()为false则执行mysqlConnection.connect();之后执行detectingSQL语句,然后记录costTime,最后执行((HeartBeatCallback) haController).onSuccess(costTime);依次捕获SocketTimeoutException、IOException、Throwable,然后则执行((HeartBeatCallback) haController).onFailed(e),更新reconnect为true

doc

  • MysqlDetectingTimeTask
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MysqlDetectingTimeTask
  • HeartBeatCallback
  • HeartBeatHAController
  • doSwitch
  • 小结
  • doc
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档