前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊canal的EventTransactionBuffer

聊聊canal的EventTransactionBuffer

原创
作者头像
code4it
修改2020-04-26 10:11:04
3860
修改2020-04-26 10:11:04
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下canal的EventTransactionBuffer

EventTransactionBuffer

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer.java

代码语言:javascript
复制
public class EventTransactionBuffer extends AbstractCanalLifeCycle {
​
    private static final long        INIT_SQEUENCE = -1;
    private int                      bufferSize    = 1024;
    private int                      indexMask;
    private CanalEntry.Entry[]       entries;
​
    private AtomicLong               putSequence   = new AtomicLong(INIT_SQEUENCE); // 代表当前put操作最后一次写操作发生的位置
    private AtomicLong               flushSequence = new AtomicLong(INIT_SQEUENCE); // 代表满足flush条件后最后一次数据flush的时间
​
    private TransactionFlushCallback flushCallback;
​
    public EventTransactionBuffer(){
​
    }
​
    public EventTransactionBuffer(TransactionFlushCallback flushCallback){
        this.flushCallback = flushCallback;
    }
​
    public void start() throws CanalStoreException {
        super.start();
        if (Integer.bitCount(bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
​
        Assert.notNull(flushCallback, "flush callback is null!");
        indexMask = bufferSize - 1;
        entries = new CanalEntry.Entry[bufferSize];
    }
​
    public void stop() throws CanalStoreException {
        putSequence.set(INIT_SQEUENCE);
        flushSequence.set(INIT_SQEUENCE);
​
        entries = null;
        super.stop();
    }
​
    public void add(List<CanalEntry.Entry> entrys) throws InterruptedException {
        for (CanalEntry.Entry entry : entrys) {
            add(entry);
        }
    }
​
    public void add(CanalEntry.Entry entry) throws InterruptedException {
        switch (entry.getEntryType()) {
            case TRANSACTIONBEGIN:
                flush();// 刷新上一次的数据
                put(entry);
                break;
            case TRANSACTIONEND:
                put(entry);
                flush();
                break;
            case ROWDATA:
                put(entry);
                // 针对非DML的数据,直接输出,不进行buffer控制
                EventType eventType = entry.getHeader().getEventType();
                if (eventType != null && !isDml(eventType)) {
                    flush();
                }
                break;
            case HEARTBEAT:
                // master过来的heartbeat,说明binlog已经读完了,是idle状态
                put(entry);
                flush();
                break;
            default:
                break;
        }
    }
​
    public void reset() {
        putSequence.set(INIT_SQEUENCE);
        flushSequence.set(INIT_SQEUENCE);
    }
​
    private void put(CanalEntry.Entry data) throws InterruptedException {
        // 首先检查是否有空位
        if (checkFreeSlotAt(putSequence.get() + 1)) {
            long current = putSequence.get();
            long next = current + 1;
​
            // 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值
            entries[getIndex(next)] = data;
            putSequence.set(next);
        } else {
            flush();// buffer区满了,刷新一下
            put(data);// 继续加一下新数据
        }
    }
​
    private void flush() throws InterruptedException {
        long start = this.flushSequence.get() + 1;
        long end = this.putSequence.get();
​
        if (start <= end) {
            List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();
            for (long next = start; next <= end; next++) {
                transaction.add(this.entries[getIndex(next)]);
            }
​
            flushCallback.flush(transaction);
            flushSequence.set(end);// flush成功后,更新flush位置
        }
    }
​
    //......
​
 }
  • EventTransactionBuffer继承了AbstractCanalLifeCycle,其start方法创建bufferSize大小的CanalEntry.Entry数组;其stop方法设置putSequence及flushSequence为INIT_SQEUENCE,设置entries为null;其add方法根据entry.getEntryType()的不同类型做不同的处理,基本是执行put及flush方法;其reset方法设置putSequence及flushSequence为INIT_SQEUENCE;put方法给entries复制的同时更新putSequence,如果buffer满了则执行flush在重新put;flush方法则执行flushCallback.flush(transaction),并更新flushSequence

TransactionFlushCallback

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer.java

代码语言:javascript
复制
    public static interface TransactionFlushCallback {
​
        public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException;
    }
  • TransactionFlushCallback接口定义了flush方法,它接收CanalEntry.Entry类型的List

EntryProtocol.proto

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto

代码语言:javascript
复制
syntax = "proto3";
package com.alibaba.otter.canal.protocol;
​
option java_package = "com.alibaba.otter.canal.protocol";
option java_outer_classname = "CanalEntry";
option optimize_for = SPEED;
​
/****************************************************************
 * message model
 *如果要在Enum中新增类型,确保以前的类型的下标值不变.
 ****************************************************************/
message Entry {
    /**协议头部信息**/
     Header                     header              = 1;
    ///**打散后的事件类型**/ [default = ROWDATA]
    oneof entryType_present{
        EntryType                   entryType           = 2;
    }
​
    /**传输的二进制数组**/
    bytes                       storeValue          = 3;
}
​
/**message Header**/
message Header {
    /**协议的版本号**/  //[default = 1]
    oneof version_present {
        int32                   version             = 1;
    }
​
​
    /**binlog/redolog 文件名**/
    string                  logfileName         = 2;
​
    /**binlog/redolog 文件的偏移位置**/
    int64                   logfileOffset       = 3;
​
    /**服务端serverId**/
    int64               serverId            = 4;
​
    /** 变更数据的编码 **/
    string                  serverenCode        = 5;
​
    /**变更数据的执行时间 **/
    int64                   executeTime         = 6;
​
    /** 变更数据的来源**/ //[default = MYSQL]
    oneof sourceType_present {
        Type                    sourceType          = 7;
    }
​
​
    /** 变更数据的schemaname**/
    string                  schemaName          = 8;
​
    /**变更数据的tablename**/
    string                  tableName           = 9;
​
    /**每个event的长度**/
    int64                   eventLength         = 10;
​
    /**数据变更类型**/  // [default = UPDATE]
    oneof eventType_present {
        EventType               eventType           = 11;
    }
​
​
    /**预留扩展**/
    repeated Pair                   props               = 12;
​
    /**当前事务的gitd**/
    string                 gtid                = 13;
}
​
/**每个字段的数据结构**/
message Column {
    /**字段下标**/
    int32       index           =       1;
​
    /**字段java中类型**/
    int32       sqlType         =       2;
​
    /**字段名称(忽略大小写),在mysql中是没有的**/
    string      name            =       3;
​
    /**是否是主键**/
    bool        isKey           =       4;
​
    /**如果EventType=UPDATE,用于标识这个字段值是否有修改**/
    bool        updated         =       5;
​
    /** 标识是否为空  **/ //[default = false]
    oneof isNull_present {
        bool        isNull          =       6;
    }
​
​
    /**预留扩展**/
    repeated Pair       props           =       7;
​
    /** 字段值,timestamp,Datetime是一个时间格式的文本 **/
    string      value           =       8;
​
    /** 对应数据对象原始长度 **/
    int32       length          =       9;
​
    /**字段mysql类型**/
    string      mysqlType       =       10;
}
​
message RowData {
​
    /** 字段信息,增量数据(修改前,删除前) **/
    repeated Column         beforeColumns   =       1;
​
    /** 字段信息,增量数据(修改后,新增后)  **/
    repeated Column         afterColumns    =       2;
​
    /**预留扩展**/
    repeated Pair           props           =       3;
}
​
/**message row 每行变更数据的数据结构**/
message RowChange {
​
    /**tableId,由数据库产生**/
    int64           tableId         =       1;
​
​
    /**数据变更类型**/ //[default = UPDATE]
    oneof eventType_present {
        EventType       eventType       =       2;
    }
​
​
    /** 标识是否是ddl语句  **/ // [default = false]
    oneof isDdl_present {
        bool            isDdl           =       10;
    }
​
​
    /** ddl/query的sql语句  **/
    string          sql             =       11;
​
    /** 一次数据库变更可能存在多行  **/
    repeated RowData        rowDatas        =       12;
​
    /**预留扩展**/
    repeated Pair           props           =       13;
​
    /** ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName  **/
    string          ddlSchemaName   =       14;
}
​
/**开始事务的一些信息**/
message TransactionBegin{
​
    /**已废弃,请使用header里的executeTime**/
    int64           executeTime     =       1;
​
    /**已废弃,Begin里不提供事务id**/
    string          transactionId   =       2;
​
    /**预留扩展**/
    repeated Pair           props           =       3;
​
    /**执行的thread Id**/
    int64           threadId        =       4;
}
​
/**结束事务的一些信息**/
message TransactionEnd{
​
    /**已废弃,请使用header里的executeTime**/
    int64           executeTime     =       1;
​
    /**事务号**/
    string          transactionId   =       2;
​
    /**预留扩展**/
    repeated Pair           props           =       3;
}
​
/**预留扩展**/
message Pair{
    string      key             =           1;
    string      value           =           2;
}
​
/**打散后的事件类型,主要用于标识事务的开始,变更数据,结束**/
enum EntryType{
    ENTRYTYPECOMPATIBLEPROTO2 = 0;
    TRANSACTIONBEGIN        =       1;
    ROWDATA                 =       2;
    TRANSACTIONEND          =       3;
    /** 心跳类型,内部使用,外部暂不可见,可忽略 **/
    HEARTBEAT               =       4;
    GTIDLOG                 =       5;
}
​
/** 事件类型 **/
enum EventType {
    EVENTTYPECOMPATIBLEPROTO2 = 0;
    INSERT      =       1;
    UPDATE      =       2;
    DELETE      =       3;
    CREATE      =       4;
    ALTER       =       5;
    ERASE       =       6;
    QUERY       =       7;
    TRUNCATE    =       8;
    RENAME      =       9;
    /**CREATE INDEX**/
    CINDEX      =       10;
    DINDEX      =       11;
    GTID        =       12;
    /** XA **/
    XACOMMIT    =       13;
    XAROLLBACK  =       14;
    /** MASTER HEARTBEAT **/
    MHEARTBEAT  =       15;
}
​
/**数据库类型**/
enum Type {
    TYPECOMPATIBLEPROTO2 = 0;
    ORACLE      =       1;
    MYSQL       =       2;
    PGSQL       =       3;
}
  • EntryProtocol.proto定义了CanalEntry.Entry,它包含header及entryType

AbstractEventParser.transactionBuffer

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

代码语言:javascript
复制
public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle implements CanalEventParser<EVENT> {
​
    //......
​
    public AbstractEventParser(){
        // 初始化一下
        transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {
​
            public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
                boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
                if (!running) {
                    return;
                }
​
                if (!successed) {
                    throw new CanalParseException("consume failed!");
                }
​
                LogPosition position = buildLastTransactionPosition(transaction);
                if (position != null) { // 可能position为空
                    logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
                }
            }
        });
    }
​
    protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
                                                                                           InterruptedException {
        long startTs = -1;
        boolean enabled = getProfilingEnabled();
        if (enabled) {
            startTs = System.currentTimeMillis();
        }
​
        boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination);
​
        if (enabled) {
            this.processingInterval = System.currentTimeMillis() - startTs;
        }
​
        if (consumedEventCount.incrementAndGet() < 0) {
            consumedEventCount.set(0);
        }
​
        return result;
    }
​
    //......
​
}
  • AbstractEventParser的构造器使用匿名TransactionFlushCallback创建了EventTransactionBuffer;该TransactionFlushCallback会执行consumeTheEventAndProfilingIfNecessary,如果不成功则抛出CanalParseException,成功则构建position并执行ogPositionManager.persistLogPosition;consumeTheEventAndProfilingIfNecessary方法则执行eventSink.sink

小结

EventTransactionBuffer继承了AbstractCanalLifeCycle,其start方法创建bufferSize大小的CanalEntry.Entry数组;其stop方法设置putSequence及flushSequence为INIT_SQEUENCE,设置entries为null;其add方法根据entry.getEntryType()的不同类型做不同的处理,基本是执行put及flush方法;其reset方法设置putSequence及flushSequence为INIT_SQEUENCE;put方法给entries复制的同时更新putSequence,如果buffer满了则执行flush在重新put;flush方法则执行flushCallback.flush(transaction),并更新flushSequence

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • EventTransactionBuffer
  • TransactionFlushCallback
  • EntryProtocol.proto
  • AbstractEventParser.transactionBuffer
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档