前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊canal的MysqlConnection

聊聊canal的MysqlConnection

作者头像
code4it
发布2020-04-26 16:59:35
6380
发布2020-04-26 16:59:35
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下canal的MysqlConnection

ErosaConnection

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java

代码语言:javascript
复制
public interface ErosaConnection {

    public void connect() throws IOException;

    public void reconnect() throws IOException;

    public void disconnect() throws IOException;

    /**
     * 用于快速数据查找,和dump的区别在于,seek会只给出部分的数据
     */
    public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException;

    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;

    public void dump(long timestamp, SinkFunction func) throws IOException;

    /**
     * 通过GTID同步binlog
     */
    public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException;

    // -------------

    public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException;

    public void dump(long timestamp, MultiStageCoprocessor coprocessor) throws IOException;

    public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException;

    ErosaConnection fork();

    public long queryServerId() throws IOException;
}
  • ErosaConnection接口定义了connect、reconnect、disconnect、seek、dump、fork、queryServerId方法

MysqlConnection

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

代码语言:javascript
复制
public class MysqlConnection implements ErosaConnection {

    private static final Logger logger         = LoggerFactory.getLogger(MysqlConnection.class);

    private MysqlConnector      connector;
    private long                slaveId;
    private Charset             charset        = Charset.forName("UTF-8");
    private BinlogFormat        binlogFormat;
    private BinlogImage         binlogImage;

    // tsdb releated
    private AuthenticationInfo  authInfo;
    protected int               connTimeout    = 5 * 1000;                                      // 5秒
    protected int               soTimeout      = 60 * 60 * 1000;                                // 1小时
    private int                 binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
    // dump binlog bytes, 暂不包括meta与TSDB
    private AtomicLong          receivedBinlogBytes;

    public MysqlConnection(){
    }

    public MysqlConnection(InetSocketAddress address, String username, String password){
        authInfo = new AuthenticationInfo();
        authInfo.setAddress(address);
        authInfo.setUsername(username);
        authInfo.setPassword(password);
        connector = new MysqlConnector(address, username, password);
        // 将connection里面的参数透传下
        connector.setSoTimeout(soTimeout);
        connector.setConnTimeout(connTimeout);
    }

    public MysqlConnection(InetSocketAddress address, String username, String password, byte charsetNumber,
                           String defaultSchema){
        authInfo = new AuthenticationInfo();
        authInfo.setAddress(address);
        authInfo.setUsername(username);
        authInfo.setPassword(password);
        authInfo.setDefaultDatabaseName(defaultSchema);
        connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema);
        // 将connection里面的参数透传下
        connector.setSoTimeout(soTimeout);
        connector.setConnTimeout(connTimeout);
    }

    public void connect() throws IOException {
        connector.connect();
    }

    public void reconnect() throws IOException {
        connector.reconnect();
    }

    public void disconnect() throws IOException {
        connector.disconnect();
    }

    public boolean isConnected() {
        return connector.isConnected();
    }

    public MysqlConnection fork() {
        MysqlConnection connection = new MysqlConnection();
        connection.setCharset(getCharset());
        connection.setSlaveId(getSlaveId());
        connection.setConnector(connector.fork());
        // set authInfo
        connection.setAuthInfo(authInfo);
        return connection;
    }

    @Override
    public long queryServerId() throws IOException {
        ResultSetPacket resultSetPacket = query("show variables like 'server_id'");
        List<String> fieldValues = resultSetPacket.getFieldValues();
        if (fieldValues == null || fieldValues.size() != 2) {
            return 0;
        }
        return NumberUtils.toLong(fieldValues.get(1));
    }

    public ResultSetPacket query(String cmd) throws IOException {
        MysqlQueryExecutor exector = new MysqlQueryExecutor(connector);
        return exector.query(cmd);
    }

    //......

}
  • MysqlConnection实现了ErosaConnection接口,其构造器会构建AuthenticationInfo及MysqlConnector;其connect、reconnect、disconnect方法均直接委托给MysqlConnector;其fork方法会使用connector.fork()重新创建一个MysqlConnection;其queryServerId方法则使用show variables like 'server_id'查询

seek

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

代码语言:javascript
复制
public class MysqlConnection implements ErosaConnection {

    //......

    public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException {
        updateSettings();
        loadBinlogChecksum();
        sendBinlogDump(binlogfilename, binlogPosition);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        fetcher.start(connector.getChannel());
        LogDecoder decoder = new LogDecoder();
        decoder.handle(LogEvent.ROTATE_EVENT);
        decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);
        decoder.handle(LogEvent.QUERY_EVENT);
        decoder.handle(LogEvent.XID_EVENT);
        LogContext context = new LogContext();
        // 若entry position存在gtid,则使用传入的gtid作为gtidSet
        // 拼接的标准,否则同时开启gtid和tsdb时,会导致丢失gtid
        // 而当源端数据库gtid 有purged时会有如下类似报错
        // 'errno = 1236, sqlstate = HY000 errmsg = The slave is connecting
        // using CHANGE MASTER TO MASTER_AUTO_POSITION = 1 ...
        if (StringUtils.isNotEmpty(gtid)) {
            decoder.handle(LogEvent.GTID_LOG_EVENT);
            context.setGtidSet(MysqlGTIDSet.parse(gtid));
        }
        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
        while (fetcher.fetch()) {
            accumulateReceivedBytes(fetcher.limit());
            LogEvent event = null;
            event = decoder.decode(fetcher, context);

            if (event == null) {
                throw new CanalParseException("parse failed");
            }

            if (!func.sink(event)) {
                break;
            }
        }
    }

    private void updateSettings() throws IOException {
        try {
            update("set wait_timeout=9999999");
        } catch (Exception e) {
            logger.warn("update wait_timeout failed", e);
        }
        try {
            update("set net_write_timeout=1800");
        } catch (Exception e) {
            logger.warn("update net_write_timeout failed", e);
        }

        try {
            update("set net_read_timeout=1800");
        } catch (Exception e) {
            logger.warn("update net_read_timeout failed", e);
        }

        try {
            // 设置服务端返回结果时不做编码转化,直接按照数据库的二进制编码进行发送,由客户端自己根据需求进行编码转化
            update("set names 'binary'");
        } catch (Exception e) {
            logger.warn("update names failed", e);
        }

        try {
            // mysql5.6针对checksum支持需要设置session变量
            // 如果不设置会出现错误:Slave can not handle replication events with the
            // checksum that master is configured to log
            // 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
            // '@@global.binlog_checksum'需要去掉单引号,在mysql 5.6.29下导致master退出
            update("set @master_binlog_checksum= @@global.binlog_checksum");
        } catch (Exception e) {
            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
                logger.warn("update master_binlog_checksum failed", e);
            }
        }

        try {
            // 参考:https://github.com/alibaba/canal/issues/284
            // mysql5.6需要设置slave_uuid避免被server kill链接
            update("set @slave_uuid=uuid()");
        } catch (Exception e) {
            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
                logger.warn("update slave_uuid failed", e);
            }
        }

        try {
            // mariadb针对特殊的类型,需要设置session变量
            update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
        } catch (Exception e) {
            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
                logger.warn("update mariadb_slave_capability failed", e);
            }
        }

        /**
         * MASTER_HEARTBEAT_PERIOD sets the interval in seconds between
         * replication heartbeats. Whenever the master's binary log is updated
         * with an event, the waiting period for the next heartbeat is reset.
         * interval is a decimal value having the range 0 to 4294967 seconds and
         * a resolution in milliseconds; the smallest nonzero value is 0.001.
         * Heartbeats are sent by the master only if there are no unsent events
         * in the binary log file for a period longer than interval.
         */
        try {
            long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);
            update("SET @master_heartbeat_period=" + periodNano);
        } catch (Exception e) {
            logger.warn("update master_heartbeat_period failed", e);
        }
    }

    private void loadBinlogChecksum() {
        ResultSetPacket rs = null;
        try {
            rs = query("select @@global.binlog_checksum");
            List<String> columnValues = rs.getFieldValues();
            if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0) != null
                && columnValues.get(0).toUpperCase().equals("CRC32")) {
                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
            } else {
                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
            }
        } catch (Throwable e) {
            // logger.error("", e);
            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
        }
    }

    private void sendBinlogDump(String binlogfilename, Long binlogPosition) throws IOException {
        BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
        binlogDumpCmd.binlogFileName = binlogfilename;
        binlogDumpCmd.binlogPosition = binlogPosition;
        binlogDumpCmd.slaveServerId = this.slaveId;
        byte[] cmdBody = binlogDumpCmd.toBytes();

        logger.info("COM_BINLOG_DUMP with position:{}", binlogDumpCmd);
        HeaderPacket binlogDumpHeader = new HeaderPacket();
        binlogDumpHeader.setPacketBodyLength(cmdBody.length);
        binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
        connector.setDumping(true);
    }

    //......
}
  • seek方法先执行updateSettings、loadBinlogChecksum、sendBinlogDump方法,然后创建DirectLogFetcher来fetch数据;updateSettings方法会设置wait_timeout、net_write_timeout、net_read_timeout、master_heartbeat_period等参数;loadBinlogChecksum方法会校验master的binlogChecksum;sendBinlogDump方法则是发送BinlogDumpCommandPacket

dump

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

代码语言:javascript
复制
public class MysqlConnection implements ErosaConnection {

    //......

    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
        updateSettings();
        loadBinlogChecksum();
        sendRegisterSlave();
        sendBinlogDump(binlogfilename, binlogPosition);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        fetcher.start(connector.getChannel());
        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
        LogContext context = new LogContext();
        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
        while (fetcher.fetch()) {
            accumulateReceivedBytes(fetcher.limit());
            LogEvent event = null;
            event = decoder.decode(fetcher, context);

            if (event == null) {
                throw new CanalParseException("parse failed");
            }

            if (!func.sink(event)) {
                break;
            }

            if (event.getSemival() == 1) {
                sendSemiAck(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
            }
        }
    }

    public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException {
        updateSettings();
        loadBinlogChecksum();
        sendBinlogDumpGTID(gtidSet);

        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        try {
            fetcher.start(connector.getChannel());
            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
            LogContext context = new LogContext();
            context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
            // fix bug: #890 将gtid传输至context中,供decode使用
            context.setGtidSet(gtidSet);
            while (fetcher.fetch()) {
                accumulateReceivedBytes(fetcher.limit());
                LogEvent event = null;
                event = decoder.decode(fetcher, context);

                if (event == null) {
                    throw new CanalParseException("parse failed");
                }

                if (!func.sink(event)) {
                    break;
                }
            }
        } finally {
            fetcher.close();
        }
    }

    public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {
        updateSettings();
        loadBinlogChecksum();
        sendRegisterSlave();
        sendBinlogDump(binlogfilename, binlogPosition);
        ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
        ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        try {
            fetcher.start(connector.getChannel());
            while (fetcher.fetch()) {
                accumulateReceivedBytes(fetcher.limit());
                LogBuffer buffer = fetcher.duplicate();
                fetcher.consume(fetcher.limit());
                if (!coprocessor.publish(buffer)) {
                    break;
                }
            }
        } finally {
            fetcher.close();
        }
    }

    public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException {
        updateSettings();
        loadBinlogChecksum();
        sendBinlogDumpGTID(gtidSet);
        ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
        ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        try {
            fetcher.start(connector.getChannel());
            while (fetcher.fetch()) {
                accumulateReceivedBytes(fetcher.limit());
                LogBuffer buffer = fetcher.duplicate();
                fetcher.consume(fetcher.limit());
                if (!coprocessor.publish(buffer)) {
                    break;
                }
            }
        } finally {
            fetcher.close();
        }
    }

    private void sendRegisterSlave() throws IOException {
        RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();
        SocketAddress socketAddress = connector.getChannel().getLocalSocketAddress();
        if (socketAddress == null || !(socketAddress instanceof InetSocketAddress)) {
            return;
        }

        InetSocketAddress address = (InetSocketAddress) socketAddress;
        String host = address.getHostString();
        int port = address.getPort();
        cmd.reportHost = host;
        cmd.reportPort = port;
        cmd.reportPasswd = authInfo.getPassword();
        cmd.reportUser = authInfo.getUsername();
        cmd.serverId = this.slaveId;
        byte[] cmdBody = cmd.toBytes();

        logger.info("Register slave {}", cmd);

        HeaderPacket header = new HeaderPacket();
        header.setPacketBodyLength(cmdBody.length);
        header.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);

        header = PacketManager.readHeader(connector.getChannel(), 4);
        byte[] body = PacketManager.readBytes(connector.getChannel(), header.getPacketBodyLength());
        assert body != null;
        if (body[0] < 0) {
            if (body[0] == -1) {
                ErrorPacket err = new ErrorPacket();
                err.fromBytes(body);
                throw new IOException("Error When doing Register slave:" + err.toString());
            } else {
                throw new IOException("unpexpected packet with field_count=" + body[0]);
            }
        }
    }

    private void sendBinlogDumpGTID(GTIDSet gtidSet) throws IOException {
        BinlogDumpGTIDCommandPacket binlogDumpCmd = new BinlogDumpGTIDCommandPacket();
        binlogDumpCmd.slaveServerId = this.slaveId;
        binlogDumpCmd.gtidSet = gtidSet;
        byte[] cmdBody = binlogDumpCmd.toBytes();

        logger.info("COM_BINLOG_DUMP_GTID:{}", binlogDumpCmd);
        HeaderPacket binlogDumpHeader = new HeaderPacket();
        binlogDumpHeader.setPacketBodyLength(cmdBody.length);
        binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
        connector.setDumping(true);
    }

    //......

}
  • MysqlConnection提供了多个dump方法,主要分为根据binlog文件名的,根据gtidSet的,以及使用SinkFunction或者MultiStageCoprocessor的
  • 根据binlogfilename的dump方法先执行updateSettings、loadBinlogChecksum、sendRegisterSlave、sendBinlogDump,然后创建DirectLogFetcher拉取数据,触发SinkFunction或者MultiStageCoprocessor;根据gtidSet的dump方法先执行updateSettings、loadBinlogChecksum、sendBinlogDumpGTID,然后创建DirectLogFetcher拉取数据,触发SinkFunction或者MultiStageCoprocessor
  • sendRegisterSlave方法构造并发送RegisterSlaveCommandPacket;sendBinlogDumpGTID方法则构造并发送BinlogDumpGTIDCommandPacket

小结

MysqlConnection实现了ErosaConnection接口,其构造器会构建AuthenticationInfo及MysqlConnector;其connect、reconnect、disconnect方法均直接委托给MysqlConnector;其fork方法会使用connector.fork()重新创建一个MysqlConnection;其queryServerId方法则使用show variables like 'server_id'查询;它提供了多个dump方法,主要分为根据binlog文件名的,根据gtidSet的,以及使用SinkFunction或者MultiStageCoprocessor的

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ErosaConnection
  • MysqlConnection
  • seek
  • dump
  • 小结
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档