前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊canal的Position

聊聊canal的Position

作者头像
code4it
发布2020-04-20 13:22:08
6420
发布2020-04-20 13:22:08
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下canal的Position

Position

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/Position.java

代码语言:javascript
复制
public abstract class Position implements Serializable {

    private static final long serialVersionUID = 2332798099928474975L;

    public String toString() {
        return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
    }

}
  • Position定义了toString方法,使用ToStringBuilder.reflectionToString方法以CanalToStringStyle.DEFAULT_STYLE来转换

TimePosition

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/TimePosition.java

代码语言:javascript
复制
public class TimePosition extends Position {

    private static final long serialVersionUID = 6185261261064226380L;
    protected Long            timestamp;

    public TimePosition(Long timestamp){
        this.timestamp = timestamp;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    //......
}
  • TimePosition继承了Position,它定义了timestamp属性

EntryPosition

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/EntryPosition.java

代码语言:javascript
复制
public class EntryPosition extends TimePosition {

    private static final long serialVersionUID      = 81432665066427482L;
    public static final int   EVENTIDENTITY_SEGMENT = 3;
    public static final char  EVENTIDENTITY_SPLIT   = (char) 5;

    private boolean           included              = false;
    private String            journalName;
    private Long              position;
    // add by agapple at 2016-06-28
    private Long              serverId              = null;              // 记录一下位点对应的serverId
    private String            gtid                  = null;

    //......
}
  • EntryPosition继承了TimePosition,它定义了included、journalName、position、serverId、gtid属性

SlaveEntryPosition

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/SlaveEntryPosition.java

代码语言:javascript
复制
public class SlaveEntryPosition extends EntryPosition {

    private static final long serialVersionUID = 5271424551446372093L;
    private final String      masterHost;
    private final String      masterPort;

    public SlaveEntryPosition(String fileName, long position, String masterHost, String masterPort){
        super(fileName, position);

        this.masterHost = masterHost;
        this.masterPort = masterPort;
    }

    public String getMasterHost() {
        return masterHost;
    }

    public String getMasterPort() {
        return masterPort;
    }
}
  • SlaveEntryPosition继承了EntryPosition,它定义了masterHost、masterPort属性

LogPosition

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/LogPosition.java

代码语言:javascript
复制
public class LogPosition extends Position {

    private static final long serialVersionUID = 3875012010277005819L;
    private LogIdentity       identity;
    private EntryPosition     postion;

    public LogIdentity getIdentity() {
        return identity;
    }

    public void setIdentity(LogIdentity identity) {
        this.identity = identity;
    }

    public EntryPosition getPostion() {
        return postion;
    }

    public void setPostion(EntryPosition postion) {
        this.postion = postion;
    }

    //......
}
  • LogPosition继承了Position,它定义了identity、postion两个属性

cursor

canal-1.1.4/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java

代码语言:javascript
复制
public interface CanalMetaManager extends CanalLifeCycle {
    
    //......

    /**
     * 获取 cursor 游标
     */
    Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;

    /**
     * 更新 cursor 游标
     */
    void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException;

    //......

}
  • CanalMetaManager定义了getCursor、updateCursor方法;其中getCursor方法根据clientIdentity返回Position;updateCursor方法则更新指定clientIdentity的position

MemoryMetaManager

canal-1.1.4/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java

代码语言:javascript
复制
public class MemoryMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {

    protected Map<String, List<ClientIdentity>>              destinations;
    protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;
    protected Map<ClientIdentity, Position>                  cursors;

    //......

    public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {
        return cursors.get(clientIdentity);
    }

    public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {
        cursors.put(clientIdentity, position);
    }

    //......
}
  • MemoryMetaManager实现了CanalMetaManager接口,它定义了Map结构的cursors,key为ClientIdentity,value为Position;getCursor方法则根据clientIdentity从cursors获取Position;updateCursor方法则更新cursors中key为clientIdentity的value为position

小结

Position定义了toString方法,使用ToStringBuilder.reflectionToString方法以CanalToStringStyle.DEFAULT_STYLE来转换;它有两个直接子类,分别是TimePosition、LogPosition;而EntryPosition继承了TimePosition;SlaveEntryPosition继承了EntryPosition;LogPosition则组合了EntryPosition

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Position
  • TimePosition
    • EntryPosition
      • SlaveEntryPosition
      • LogPosition
      • cursor
      • MemoryMetaManager
      • 小结
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档