前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊canal的LogFetcher

聊聊canal的LogFetcher

作者头像
code4it
发布2020-04-23 10:48:27
3190
发布2020-04-23 10:48:27
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下canal的LogFetcher

LogFetcher

canal-1.1.4/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogFetcher.java

代码语言:javascript
复制
public abstract class LogFetcher extends LogBuffer implements Closeable {

    /** Default initial capacity. */
    public static final int   DEFAULT_INITIAL_CAPACITY = 8192;

    /** Default growth factor. */
    public static final float DEFAULT_GROWTH_FACTOR    = 2.0f;

    /** Binlog file header size */
    public static final int   BIN_LOG_HEADER_SIZE      = 4;

    protected final float     factor;

    public LogFetcher(){
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_GROWTH_FACTOR);
    }

    public LogFetcher(final int initialCapacity){
        this(initialCapacity, DEFAULT_GROWTH_FACTOR);
    }

    public LogFetcher(final int initialCapacity, final float growthFactor){
        this.buffer = new byte[initialCapacity];
        this.factor = growthFactor;
    }

    /**
     * Increases the capacity of this <tt>LogFetcher</tt> instance, if
     * necessary, to ensure that it can hold at least the number of elements
     * specified by the minimum capacity argument.
     *
     * @param minCapacity the desired minimum capacity
     */
    protected final void ensureCapacity(final int minCapacity) {
        final int oldCapacity = buffer.length;

        if (minCapacity > oldCapacity) {
            int newCapacity = (int) (oldCapacity * factor);
            if (newCapacity < minCapacity) newCapacity = minCapacity;

            buffer = Arrays.copyOf(buffer, newCapacity);
        }
    }

    /**
     * Fetches the next frame of binary-log, and fill it in buffer.
     */
    public abstract boolean fetch() throws IOException;

    /**
     * {@inheritDoc}
     *
     * @see java.io.Closeable#close()
     */
    public abstract void close() throws IOException;
}
  • LogFetcher继承了LogBuffer,它定义了fetch、close两个抽象方法

FileLogFetcher

canal-1.1.4/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/FileLogFetcher.java

代码语言:javascript
复制
public final class FileLogFetcher extends LogFetcher {

    public static final byte[] BINLOG_MAGIC = { -2, 0x62, 0x69, 0x6e };

    private FileInputStream    fin;

    public FileLogFetcher(){
        super(DEFAULT_INITIAL_CAPACITY, DEFAULT_GROWTH_FACTOR);
    }

    public FileLogFetcher(final int initialCapacity){
        super(initialCapacity, DEFAULT_GROWTH_FACTOR);
    }

    public FileLogFetcher(final int initialCapacity, final float growthFactor){
        super(initialCapacity, growthFactor);
    }

    /**
     * Open binlog file in local disk to fetch.
     */
    public void open(File file) throws FileNotFoundException, IOException {
        open(file, 0L);
    }

    /**
     * Open binlog file in local disk to fetch.
     */
    public void open(String filePath) throws FileNotFoundException, IOException {
        open(new File(filePath), 0L);
    }

    /**
     * Open binlog file in local disk to fetch.
     */
    public void open(String filePath, final long filePosition) throws FileNotFoundException, IOException {
        open(new File(filePath), filePosition);
    }

    /**
     * Open binlog file in local disk to fetch.
     */
    public void open(File file, final long filePosition) throws FileNotFoundException, IOException {
        fin = new FileInputStream(file);

        ensureCapacity(BIN_LOG_HEADER_SIZE);
        if (BIN_LOG_HEADER_SIZE != fin.read(buffer, 0, BIN_LOG_HEADER_SIZE)) {
            throw new IOException("No binlog file header");
        }

        if (buffer[0] != BINLOG_MAGIC[0] || buffer[1] != BINLOG_MAGIC[1] || buffer[2] != BINLOG_MAGIC[2]
            || buffer[3] != BINLOG_MAGIC[3]) {
            throw new IOException("Error binlog file header: "
                                  + Arrays.toString(Arrays.copyOf(buffer, BIN_LOG_HEADER_SIZE)));
        }

        limit = 0;
        origin = 0;
        position = 0;

        if (filePosition > BIN_LOG_HEADER_SIZE) {
            final int maxFormatDescriptionEventLen = FormatDescriptionLogEvent.LOG_EVENT_MINIMAL_HEADER_LEN
                                                     + FormatDescriptionLogEvent.ST_COMMON_HEADER_LEN_OFFSET
                                                     + LogEvent.ENUM_END_EVENT + LogEvent.BINLOG_CHECKSUM_ALG_DESC_LEN
                                                     + LogEvent.CHECKSUM_CRC32_SIGNATURE_LEN;

            ensureCapacity(maxFormatDescriptionEventLen);
            limit = fin.read(buffer, 0, maxFormatDescriptionEventLen);
            limit = (int) getUint32(LogEvent.EVENT_LEN_OFFSET);
            fin.getChannel().position(filePosition);
        }
    }

    /**
     * {@inheritDoc}
     *
     * @see com.taobao.tddl.dbsync.binlog.LogFetcher#fetch()
     */
    public boolean fetch() throws IOException {
        if (limit == 0) {
            final int len = fin.read(buffer, 0, buffer.length);
            if (len >= 0) {
                limit += len;
                position = 0;
                origin = 0;

                /* More binlog to fetch */
                return true;
            }
        } else if (origin == 0) {
            if (limit > buffer.length / 2) {
                ensureCapacity(buffer.length + limit);
            }
            final int len = fin.read(buffer, limit, buffer.length - limit);
            if (len >= 0) {
                limit += len;

                /* More binlog to fetch */
                return true;
            }
        } else if (limit > 0) {
            if (limit >= FormatDescriptionLogEvent.LOG_EVENT_HEADER_LEN) {
                int lenPosition = position + 4 + 1 + 4;
                long eventLen = ((long) (0xff & buffer[lenPosition++])) | ((long) (0xff & buffer[lenPosition++]) << 8)
                                | ((long) (0xff & buffer[lenPosition++]) << 16)
                                | ((long) (0xff & buffer[lenPosition++]) << 24);

                if (limit >= eventLen) {
                    return true;
                } else {
                    ensureCapacity((int) eventLen);
                }
            }

            System.arraycopy(buffer, origin, buffer, 0, limit);
            position -= origin;
            origin = 0;
            final int len = fin.read(buffer, limit, buffer.length - limit);
            if (len >= 0) {
                limit += len;

                /* More binlog to fetch */
                return true;
            }
        } else {
            /* Should not happen. */
            throw new IllegalArgumentException("Unexcepted limit: " + limit);
        }

        /* Reach binlog file end */
        return false;
    }

    /**
     * {@inheritDoc}
     *
     * @see com.taobao.tddl.dbsync.binlog.LogFetcher#close()
     */
    public void close() throws IOException {
        if (fin != null) {
            fin.close();
        }

        fin = null;
    }
}
  • FileLogFetcher继承了LogFetcher,它提供了open方法来打开本地的binlog文件;其fetch方法从FileInputStream读取buffer大小的数据,如果已经读到binglog的末尾则返回false,否则返回true;其close方法关闭FileInputStream

小结

LogFetcher继承了LogBuffer,它定义了fetch、close两个抽象方法

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • LogFetcher
  • FileLogFetcher
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档