前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊puma的Parser

聊聊puma的Parser

作者头像
code4it
发布2020-06-04 10:10:04
3080
发布2020-06-04 10:10:04
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下puma的Parser

Parser

puma/puma/src/main/java/com/dianping/puma/parser/Parser.java

代码语言:javascript
复制
public interface Parser extends LifeCycle {
    BinlogEvent parse(ByteBuffer buf, PumaContext context) throws IOException;
}
  • Parser继承了LifeCycle接口,它定义了parse方法,解析ByteBuffer到BinlogEvent

DefaultBinlogParser

puma/puma/src/main/java/com/dianping/puma/parser/DefaultBinlogParser.java

代码语言:javascript
复制
@ThreadSafe
public class DefaultBinlogParser implements Parser {
    private final Logger logger = LoggerFactory.getLogger(DefaultBinlogParser.class);
    private static Map<Byte, Class<? extends BinlogEvent>> eventMaps = new ConcurrentHashMap<Byte, Class<? extends BinlogEvent>>();

    @Override
    public BinlogEvent parse(ByteBuffer buf, PumaContext context) throws IOException {

        logger.debug("\n\n\n");
        logger.debug("****************************** binlog parse begin ******************************");

        BinlogHeader header = new BinlogHeader();
        header.parse(buf, context);

        logger.debug("binlog event header:\n");
        logger.debug("{}", header);

        BinlogEvent event = null;
        Class<? extends BinlogEvent> eventClass = eventMaps.get(header.getEventType());
        if (eventClass != null) {
            try {
                event = eventClass.newInstance();
            } catch (Exception e) {
                logger.error("Init event class failed. eventType: " + header.getEventType(), e);
                event = null;
            }
        }

        if (event == null) {
            event = new PumaIgnoreEvent();
        }

        logger.debug("binlog event type:\n");
        logger.debug("{}", event.getClass());

        event.parse(buf, context, header);

        logger.debug("binlog event:\n");
        logger.debug("{}", event);
        logger.debug("****************************** binlog parse end ******************************");
        logger.debug("\n\n\n");

        return event;
    }

    /*
     * (non-Javadoc)
     *
     * @see com.dianping.puma.common.LifeCycle#start()
     */
    @Override
    public void start() {
        eventMaps.put(BinlogConstants.UNKNOWN_EVENT, UnknownEvent.class);
        eventMaps.put(BinlogConstants.QUERY_EVENT, QueryEvent.class);
        eventMaps.put(BinlogConstants.STOP_EVENT, StopEvent.class);
        eventMaps.put(BinlogConstants.ROTATE_EVENT, RotateEvent.class);
        eventMaps.put(BinlogConstants.INTVAR_EVENT, IntVarEvent.class);
        eventMaps.put(BinlogConstants.RAND_EVENT, RandEvent.class);
        eventMaps.put(BinlogConstants.USER_VAR_EVENT, UserVarEvent.class);
        eventMaps.put(BinlogConstants.FORMAT_DESCRIPTION_EVENT, FormatDescriptionEvent.class);
        eventMaps.put(BinlogConstants.XID_EVENT, XIDEvent.class);
        eventMaps.put(BinlogConstants.TABLE_MAP_EVENT, TableMapEvent.class);
        eventMaps.put(BinlogConstants.WRITE_ROWS_EVENT_V1, WriteRowsEvent.class);
        eventMaps.put(BinlogConstants.UPDATE_ROWS_EVENT_V1, UpdateRowsEvent.class);
        eventMaps.put(BinlogConstants.DELETE_ROWS_EVENT_V1, DeleteRowsEvent.class);
        eventMaps.put(BinlogConstants.INCIDENT_EVENT, IncidentEvent.class);
        //mysql --5.6
        eventMaps.put(BinlogConstants.WRITE_ROWS_EVENT, WriteRowsEvent.class);
        eventMaps.put(BinlogConstants.UPDATE_ROWS_EVENT, UpdateRowsEvent.class);
        eventMaps.put(BinlogConstants.DELETE_ROWS_EVENT, DeleteRowsEvent.class);
        eventMaps.put(BinlogConstants.HEARTBEAT_LOG_EVENT, HeartbeatEvent.class);
        eventMaps.put(BinlogConstants.IGNORABLE_LOG_EVENT, IgnorableEvent.class);
        eventMaps.put(BinlogConstants.ROWS_QUERY_LOG_EVENT, RowsQueryEvent.class);
        eventMaps.put(BinlogConstants.GTID_LOG_EVENT, GtidEvent.class);
        eventMaps.put(BinlogConstants.ANONYMOUS_GTID_LOG_EVENT, AnonymousGtidEvent.class);
        eventMaps.put(BinlogConstants.PREVIOUS_GTIDS_LOG_EVENT, PreviousGtidsEvent.class);
    }

    @Override
    public void stop() {

    }

}
  • DefaultBinlogParser实现了Parser接口,其parse方法通过header.getEventType()先实例化对应的BinlogEvent,然后通过event.parse(buf, context, header)进行解析

BinlogEvent

puma/puma/src/main/java/com/dianping/puma/parser/mysql/event/BinlogEvent.java

代码语言:javascript
复制
public interface BinlogEvent extends Serializable {
    BinlogHeader getHeader();
    
    void setHeader(BinlogHeader header);

    void parse(ByteBuffer buf, PumaContext context, BinlogHeader header) throws IOException;
}
  • BinlogEvent接口定义了getHeader、setHeader、parse方法

AbstractBinlogEvent

puma/puma/src/main/java/com/dianping/puma/parser/mysql/event/AbstractBinlogEvent.java

代码语言:javascript
复制
public abstract class AbstractBinlogEvent implements BinlogEvent {
    private static final long serialVersionUID = -8136236885229956889L;
    private BinlogHeader header;
    private int checksumAlg = BinlogConstants.CHECKSUM_ALG_OFF;
    private long crc;

    @Override
    public void parse(ByteBuffer buf, PumaContext context, BinlogHeader header) throws IOException {
        this.header = header;
        doParse(buf, context);
        if (!(this.header.getEventType() == BinlogConstants.ROTATE_EVENT)) {
            checksumAlg = context.getChecksumAlg(); // fetch checksum alg
            parseCheckSum(buf);
        }
    }

    @Override
    public BinlogHeader getHeader() {
        return header;
    }
    
    @Override
    public void setHeader(BinlogHeader header) {
        this.header = header;
    }

    public abstract void doParse(ByteBuffer buf, PumaContext context) throws IOException;

    private void parseCheckSum(ByteBuffer buf) {
        if (checksumAlg != BinlogConstants.CHECKSUM_ALG_OFF && checksumAlg != BinlogConstants.CHECKSUM_ALG_UNDEF) {
            buf.position((int) (this.header.getEventLength() - 4));
            setCrc(PacketUtils.readLong(buf, 4));
        }
    }

    @Override public String toString() {
        return new ToStringBuilder(this)
                .append("header", header)
                .append("checksumAlg", checksumAlg)
                .append("crc", crc)
                .toString();
    }

    public void setChecksumAlg(int checksumAlg) {
        this.checksumAlg = checksumAlg;
    }

    public int getChecksumAlg() {
        return checksumAlg;
    }

    public long getCrc() {
        return crc;
    }

    public void setCrc(long crc) {
        this.crc = crc;
    }

    public boolean isRemaining(ByteBuffer buf, PumaContext context) {
        return context.isCheckSum() ? buf.remaining() - 4 > 0 : buf.hasRemaining();
    }

    public int lenRemaining(ByteBuffer buf, PumaContext context) {
        return context.isCheckSum() ? buf.remaining() - 4 : buf.remaining();
    }
}
  • AbstractBinlogEvent声明实现了BinlogEvent接口,其parse方法会调用doParse方法,之后对于非ROTATE_EVENT会执行parseCheckSum

小结

Parser继承了LifeCycle接口,它定义了parse方法,解析ByteBuffer到BinlogEvent;DefaultBinlogParser实现了Parser接口,其parse方法通过header.getEventType()先实例化对应的BinlogEvent,然后通过event.parse(buf, context, header)进行解析

doc

  • Parser
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Parser
  • DefaultBinlogParser
  • BinlogEvent
  • AbstractBinlogEvent
  • 小结
  • doc
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档