专栏首页码匠的流水账聊聊puma的DefaultDataHandler

聊聊puma的DefaultDataHandler

本文主要研究一下puma的DefaultDataHandler

DataHandler

puma/puma/src/main/java/com/dianping/puma/datahandler/DataHandler.java

public interface DataHandler extends LifeCycle {
    DataHandlerResult process(BinlogEvent binlogEvent, PumaContext context);
}
  • DataHandler继承了LifeCycle,它定义了process方法

AbstractDataHandler

puma/puma/src/main/java/com/dianping/puma/datahandler/AbstractDataHandler.java

@ThreadUnSafe
public abstract class AbstractDataHandler implements DataHandler {
    private static final Logger log = Logger.getLogger(AbstractDataHandler.class);

    private TableMetaInfoFetcher tableMetasInfoFetcher;

    /**
     * @return the tableMetasInfoFetcher
     */
    public TableMetaInfoFetcher getTableMetasInfoFetcher() {
        return tableMetasInfoFetcher;
    }

    /**
     * @param tableMetasInfoFetcher the tableMetasInfoFetcher to set
     */
    public void setTableMetasInfoFetcher(TableMetaInfoFetcher tableMetasInfoFetcher) {
        this.tableMetasInfoFetcher = tableMetasInfoFetcher;
    }

    @Override
    public void start() {
    }


    @Override
    public void stop() {

    }

    protected Object convertUnsignedValueIfNeeded(int pos, Object value, TableMetaInfo tableMeta) {
        Object newValue = value;
        if (value != null) {
            switch (tableMeta.getRawTypeCodes().get(pos)) {
                case BinlogConstants.MYSQL_TYPE_TINY:
                    if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) {
                        newValue = Integer.valueOf((Integer) value + (1 << 8));
                    }
                    break;
                case BinlogConstants.MYSQL_TYPE_INT24:
                    if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) {
                        newValue = Integer.valueOf((Integer) value + (1 << 24));
                    }
                    break;
                case BinlogConstants.MYSQL_TYPE_SHORT:
                    if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) {
                        newValue = Integer.valueOf((Integer) value + (1 << 16));
                    }
                    break;
                case BinlogConstants.MYSQL_TYPE_INT:
                    if ((value instanceof Integer) && (Integer) value < 0 && !tableMeta.getSignedInfos().get(pos)) {
                        newValue = Long.valueOf((Integer) value) + (1L << 32);
                    } else {
                        if (value instanceof Integer) {
                            newValue = Long.valueOf((Integer) value);
                        }
                    }
                    break;
                case BinlogConstants.MYSQL_TYPE_LONGLONG:
                    if ((value instanceof Long) && (Long) value < 0 && !tableMeta.getSignedInfos().get(pos)) {
                        newValue = BigInteger.valueOf((Long) value).add(BigInteger.ONE.shiftLeft(64));
                    } else {
                        if (value instanceof Long) {
                            newValue = BigInteger.valueOf((Long) value);
                        }
                    }
                    break;
                default:
                    break;
            }

        }
        return newValue;
    }

    @Override
    public DataHandlerResult process(BinlogEvent binlogEvent, PumaContext context) {
        DataHandlerResult result = new DataHandlerResult();
        if (binlogEvent instanceof PumaIgnoreEvent) {
            log.info("Ingore one unknown event. eventType: " + binlogEvent.getHeader().getEventType());
            result.setEmpty(true);
            result.setFinished(true);
            return result;
        }

        byte eventType = binlogEvent.getHeader().getEventType();
        if (log.isDebugEnabled()) {
            log.debug("event#" + eventType);
        }
        if (eventType == BinlogConstants.STOP_EVENT || eventType == BinlogConstants.ROTATE_EVENT) {
            result.setEmpty(true);
            result.setFinished(true);
        } else if (eventType == BinlogConstants.FORMAT_DESCRIPTION_EVENT) {
            result.setEmpty(true);
            result.setFinished(true);
        } else if (eventType == BinlogConstants.QUERY_EVENT) {
            handleQueryEvent(binlogEvent, result);
        } else {
            doProcess(result, binlogEvent, context, eventType);
        }

        if (result != null && !result.isEmpty() && result.getData() != null) {
            BinlogInfo binlogInfo = new BinlogInfo(context.getDBServerId(), context.getBinlogFileName(),
                    context.getBinlogStartPos(), context.getEventIndex(), binlogEvent.getHeader().getTimestamp());
            result.getData().setBinlogInfo(binlogInfo);
            result.getData().setServerId(binlogEvent.getHeader().getServerId());
        }

        return result;
    }

    protected void handleQueryEvent(BinlogEvent binlogEvent, DataHandlerResult result) {
        QueryEvent queryEvent = (QueryEvent) binlogEvent;
        String sql = StringUtils.normalizeSpace(queryEvent.getSql());
        if (StringUtils.startsWithIgnoreCase(sql, "ALTER ") || StringUtils.startsWithIgnoreCase(sql, "CREATE ")
                || StringUtils.startsWithIgnoreCase(sql, "DROP ") || StringUtils.startsWithIgnoreCase(sql, "RENAME ")
                || StringUtils.startsWithIgnoreCase(sql, "TRUNCATE ")) {

            handleDDlEvent(result, queryEvent, sql);

        } else if (StringUtils.equalsIgnoreCase(sql, "BEGIN")) {

            handleTransactionBeginEvent(binlogEvent, result, queryEvent);
        } else {
            result.setEmpty(true);
            result.setFinished(true);
            // log.info("QueryEvent  sql=" + queryEvent.getSql());
        }
    }

    protected void handleTransactionBeginEvent(BinlogEvent binlogEvent, DataHandlerResult result,
                                               QueryEvent queryEvent) {
        // BEGIN事件,发送一个begin transaction的事件
        ChangedEvent dataChangedEvent = new RowChangedEvent();
        ((RowChangedEvent) dataChangedEvent).setTransactionBegin(true);
        dataChangedEvent.setExecuteTime(binlogEvent.getHeader().getTimestamp());
        dataChangedEvent.setDatabase(queryEvent.getDatabaseName());

        result.setData(dataChangedEvent);
        result.setEmpty(false);
        result.setFinished(true);
    }

    /**
     * @param result
     * @param queryEvent
     * @param sql
     */
    protected void handleDDlEvent(DataHandlerResult result, QueryEvent queryEvent, String sql) {
        ChangedEvent dataChangedEvent = new DdlEvent();
        DdlEvent ddlEvent = (DdlEvent) dataChangedEvent;
        ddlEvent.setSql(sql);
        ddlEvent.setDdlEventType(SimpleDdlParser.getEventType(sql));

        ddlEvent.setDdlEventSubType(SimpleDdlParser.getEventSubType(ddlEvent.getDdlEventType(), sql));
        if (ddlEvent.getDdlEventType() == DdlEventType.DDL_DEFAULT
                || ddlEvent.getDdlEventSubType() == DdlEventSubType.DDL_SUB_DEFAULT) {
            log.info("DdlEvent Type do not found. ddl sql=" + sql);
        }
        SimpleDdlParser.DdlResult ddlResult = SimpleDdlParser
                .getDdlResult(ddlEvent.getDdlEventType(), ddlEvent.getDdlEventSubType(), sql);
        if (ddlResult != null) {
            ddlEvent.setDatabase(StringUtils.isNotBlank(ddlResult.getDatabase()) ? ddlResult.getDatabase()
                    : StringUtils.EMPTY);
            ddlEvent.setTable(StringUtils.isNotBlank(ddlResult.getTable()) ? ddlResult.getTable() : StringUtils.EMPTY);
            if (ddlEvent.getDdlEventType() != DdlEventType.DDL_CREATE) {
                log.info("DDL event, sql=" + sql + "  ,database =" + ddlResult.getDatabase() + " table ="
                        + ddlResult.getTable() + " queryEvent.getDatabaseName()" + queryEvent.getDatabaseName());
            }
        }
        if (StringUtils.isBlank(ddlEvent.getDatabase())) {
            ddlEvent.setDatabase(queryEvent.getDatabaseName());
        }

        if (ddlEvent.getDdlEventType() == DdlEventType.DDL_ALTER
                && ddlEvent.getDdlEventSubType() == DdlEventSubType.DDL_ALTER_TABLE) {
            ddlEvent.setDDLType(DDLType.ALTER_TABLE);
        }

        tableMetasInfoFetcher.refreshTableMeta(ddlEvent.getDatabase(), ddlEvent.getTable());

        ddlEvent.setExecuteTime(queryEvent.getHeader().getTimestamp());
        result.setData(dataChangedEvent);
        result.setEmpty(false);
        result.setFinished(true);
    }

    protected abstract void doProcess(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context,
                                      byte eventType);
}
  • AbstractDataHandler声明实现了DataHandler接口,其process方法针对BinlogConstants.QUERY_EVENT执行handleQueryEvent,非STOP_EVENT、ROTATE_EVENT、FORMAT_DESCRIPTION_EVENT的event执行doProcess,该方法由子类去实现;handleQueryEvent方法针对ALTER、CREATE、DROP、RENAME、TRUNCATE的执行handleDDlEvent方法,针对BEGIN的执行handleTransactionBeginEvent方法;handleDDlEvent方法会执行tableMetasInfoFetcher.refreshTableMeta;handleTransactionBeginEvent方法标记result的finished为true

DefaultDataHandler

puma/puma/src/main/java/com/dianping/puma/datahandler/DefaultDataHandler.java

@ThreadUnSafe
public class DefaultDataHandler extends AbstractDataHandler {
    private Logger log = Logger.getLogger(DefaultDataHandler.class);

    private Map<Long, TableMetaInfo> tableMetaInfos;

    private int rowPos = 0;

    @Override
    protected void doProcess(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context, byte eventType) {
        if (log.isDebugEnabled()) {
            log.debug("event:" + eventType);
        }
        switch (eventType) {
            case BinlogConstants.TABLE_MAP_EVENT:
                TableMapEvent tableMapEvent = (TableMapEvent) binlogEvent;

                if (tableMetaInfos == null) {
                    tableMetaInfos = new HashMap<Long, TableMetaInfo>();
                }

                TableMetaInfo tableMetaInfo = getTableMetasInfoFetcher().getTableMetaInfo(tableMapEvent.getDatabaseName(),
                        tableMapEvent.getTableName());

                if (tableMetaInfo != null) {
                    tableMetaInfos.put(tableMapEvent.getTableId(), tableMetaInfo);
                    if (log.isDebugEnabled()) {
                        log.debug("put meta info for table id:" + tableMapEvent.getTableId());
                    }
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("meta info not found for:" + tableMapEvent.getDatabaseName() + "-"
                                + tableMapEvent.getTableName());
                    }
                    skipEvent(BinlogConstants.TABLE_MAP_EVENT, result, context);
                    return;
                }

                fillRawTypeCodes(tableMapEvent, tableMetaInfo);
                fillRawNullAbilities(tableMapEvent, tableMetaInfo);
                rowPos = 0;
                result.setEmpty(true);
                result.setFinished(true);
                break;
            case BinlogConstants.WRITE_ROWS_EVENT_V1:
            case BinlogConstants.WRITE_ROWS_EVENT:
                if (tableMetaInfos == null || tableMetaInfos.isEmpty()) {
                    skipEvent(BinlogConstants.WRITE_ROWS_EVENT, result, context);
                    return;
                }

                processWriteRowEvent(result, binlogEvent, context);
                break;
            case BinlogConstants.UPDATE_ROWS_EVENT_V1:
            case BinlogConstants.UPDATE_ROWS_EVENT:
                if (tableMetaInfos == null || tableMetaInfos.isEmpty()) {
                    skipEvent(BinlogConstants.UPDATE_ROWS_EVENT, result, context);
                    return;
                }
                processUpdateRowEvent(result, binlogEvent, context);
                break;
            case BinlogConstants.DELETE_ROWS_EVENT_V1:
            case BinlogConstants.DELETE_ROWS_EVENT:
                if (tableMetaInfos == null || tableMetaInfos.isEmpty()) {
                    skipEvent(BinlogConstants.DELETE_ROWS_EVENT, result, context);
                    return;
                }
                processDeleteRowEvent(result, binlogEvent, context);
                break;
            case BinlogConstants.XID_EVENT:
                if (tableMetaInfos == null || tableMetaInfos.isEmpty()) {
                    skipEvent(BinlogConstants.XID_EVENT, result, context);
                    return;
                }
                processTransactionCommitEvent(binlogEvent, result);
                break;
            default:
                result.setEmpty(true);
                result.setFinished(true);
                break;
        }

    }

    //......

}
  • DefaultDataHandler继承了AbstractDataHandler,其doProcess方法针对不同的eventType做不同的处理;对于TABLE_MAP_EVENT更新tableMetaInfo;对于WRITE_ROWS_EVENT_V1、WRITE_ROWS_EVENT执行processWriteRowEvent;对于UPDATE_ROWS_EVENT_V1、UPDATE_ROWS_EVENT执行processUpdateRowEvent;对于DELETE_ROWS_EVENT_V1、DELETE_ROWS_EVENT执行processDeleteRowEvent;对于XID_EVENT执行processTransactionCommitEvent

processWriteRowEvent

    protected void processWriteRowEvent(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context) {
        WriteRowsEvent writeRowsEvent = (WriteRowsEvent) binlogEvent;

        if (rowPos >= writeRowsEvent.getRows().size()) {
            rowPos = 0;
            result.setEmpty(true);
            result.setFinished(true);
        } else {
            TableMetaInfo tableMetaInfo = tableMetaInfos.get(writeRowsEvent.getTableId());

            if (tableMetaInfo == null) {
                skipEvent(BinlogConstants.WRITE_ROWS_EVENT, result, context);
                return;
            }

            RowChangedEvent rowChangedEvent = new RowChangedEvent();
            Map<String, ColumnInfo> columns = initColumns(writeRowsEvent, rowChangedEvent, DMLType.INSERT,
                    tableMetaInfo);

            for (int columnPos = 0, columnIndex = 0; columnPos < writeRowsEvent.getColumnCount().intValue(); columnPos++) {
                if (writeRowsEvent.getUsedColumns().get(columnPos)) {
                    Column binlogColumn = writeRowsEvent.getRows().get(rowPos).getColumns().get(columnIndex);
                    String columnName = tableMetaInfo.getColumns().get(columnPos + 1);
                    if (!checkUnknownColumnName(result, context, columnName, columnPos + 1)) {
                        return;
                    }
                    ColumnInfo columnInfo = new ColumnInfo(tableMetaInfo.getKeys().contains(columnName), null,
                            convertUnsignedValueIfNeeded(columnPos + 1, binlogColumn.getValue(), tableMetaInfo));
                    columns.put(columnName, columnInfo);
                    columnIndex++;
                }
            }

            rowPos++;
            result.setData(rowChangedEvent);
            result.setEmpty(false);
            result.setFinished(false);
        }
    }
  • processWriteRowEvent方法主要构建rowChangedEvent,并填充其columns

processUpdateRowEvent

    protected void processUpdateRowEvent(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context) {
        UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent) binlogEvent;

        if (rowPos >= updateRowsEvent.getRows().size()) {
            rowPos = 0;
            result.setEmpty(true);
            result.setFinished(true);
        } else {
            TableMetaInfo tableMetaInfo = tableMetaInfos.get(updateRowsEvent.getTableId());

            if (tableMetaInfo == null) {
                skipEvent(BinlogConstants.UPDATE_ROWS_EVENT, result, context);
                return;
            }

            RowChangedEvent rowChangedEvent = new RowChangedEvent();
            Map<String, ColumnInfo> columns = initColumns(updateRowsEvent, rowChangedEvent, DMLType.UPDATE,
                    tableMetaInfo);
            if (log.isDebugEnabled()) {
                log.debug("update from " + tableMetaInfo.getDatabase() + "." + tableMetaInfo.getTable());
            }

            for (int columnPos = 0, columnAfterIndex = 0, columnBeforeIndex = 0; columnPos < updateRowsEvent
                    .getColumnCount().intValue(); columnPos++) {
                String columnName = tableMetaInfo.getColumns().get(columnPos + 1);
                if (!checkUnknownColumnName(result, context, columnName, columnPos + 1)) {
                    return;
                }
                Column afterColumn = null;
                Column beforeColumn = null;
                if (updateRowsEvent.getUsedColumnsAfter().get(columnPos)) {
                    afterColumn = updateRowsEvent.getRows().get(rowPos).getAfter().getColumns().get(columnAfterIndex);
                    columnAfterIndex++;
                }
                if (updateRowsEvent.getUsedColumnsBefore().get(columnPos)) {
                    beforeColumn = updateRowsEvent.getRows().get(rowPos).getBefore().getColumns().get(columnBeforeIndex);
                    columnBeforeIndex++;
                }
                ColumnInfo columnInfo = new ColumnInfo(tableMetaInfo.getKeys().contains(columnName),
                        beforeColumn == null ? null : convertUnsignedValueIfNeeded(columnPos + 1, beforeColumn.getValue(),
                                tableMetaInfo), afterColumn == null ? null : convertUnsignedValueIfNeeded(columnPos + 1,
                        afterColumn.getValue(), tableMetaInfo));
                columns.put(columnName, columnInfo);
            }

            rowPos++;
            result.setData(rowChangedEvent);
            result.setEmpty(false);
            result.setFinished(false);
        }
    }
  • processUpdateRowEvent主要构建rowChangedEvent,并填充其columns

processDeleteRowEvent

    protected void processDeleteRowEvent(DataHandlerResult result, BinlogEvent binlogEvent, PumaContext context) {
        DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent) binlogEvent;

        if (rowPos >= deleteRowsEvent.getRows().size()) {
            rowPos = 0;
            result.setEmpty(true);
            result.setFinished(true);
        } else {
            TableMetaInfo tableMetaInfo = tableMetaInfos.get(deleteRowsEvent.getTableId());

            if (tableMetaInfo == null) {
                skipEvent(BinlogConstants.DELETE_ROWS_EVENT, result, context);
                return;
            }

            RowChangedEvent rowChangedEvent = new RowChangedEvent();
            Map<String, ColumnInfo> columns = initColumns(deleteRowsEvent, rowChangedEvent, DMLType.DELETE,
                    tableMetaInfo);

            for (int columnPos = 0, columnIndex = 0; columnPos < deleteRowsEvent.getColumnCount().intValue(); columnPos++) {
                if (deleteRowsEvent.getUsedColumns().get(columnPos)) {
                    Column binlogColumn = deleteRowsEvent.getRows().get(rowPos).getColumns().get(columnIndex);
                    String columnName = tableMetaInfo.getColumns().get(columnPos + 1);
                    if (!checkUnknownColumnName(result, context, columnName, columnPos + 1)) {
                        return;
                    }
                    ColumnInfo columnInfo = new ColumnInfo(tableMetaInfo.getKeys().contains(columnName),
                            convertUnsignedValueIfNeeded(columnPos + 1, binlogColumn.getValue(), tableMetaInfo), null);
                    columns.put(columnName, columnInfo);
                    columnIndex++;
                }
            }

            rowPos++;
            result.setData(rowChangedEvent);
            result.setEmpty(false);
            result.setFinished(false);
        }
    }
  • processDeleteRowEvent主要构建rowChangedEvent,并填充其columns

processTransactionCommitEvent

    protected void processTransactionCommitEvent(BinlogEvent binlogEvent, DataHandlerResult result) {
        // commit事件,发送一个commit transaction的事件
        ChangedEvent dataChangedEvent = new RowChangedEvent();
        ((RowChangedEvent) dataChangedEvent).setTransactionCommit(true);
        dataChangedEvent.setExecuteTime(binlogEvent.getHeader().getTimestamp());
        dataChangedEvent.setDatabase(tableMetaInfos.values().iterator().next().getDatabase());

        result.setData(dataChangedEvent);
        result.setEmpty(false);
        result.setFinished(true);
        tableMetaInfos.clear();
        tableMetaInfos = null;
    }
  • processTransactionCommitEvent方法主要是构建dataChangedEvent,并标记result的finished为true,并清空tableMetaInfos

小结

DefaultDataHandler继承了AbstractDataHandler,其doProcess方法针对不同的eventType做不同的处理;对于TABLE_MAP_EVENT更新tableMetaInfo;对于WRITE_ROWS_EVENT_V1、WRITE_ROWS_EVENT执行processWriteRowEvent;对于UPDATE_ROWS_EVENT_V1、UPDATE_ROWS_EVENT执行processUpdateRowEvent;对于DELETE_ROWS_EVENT_V1、DELETE_ROWS_EVENT执行processDeleteRowEvent;对于XID_EVENT执行processTransactionCommitEvent

doc

  • DefaultDataHandler

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-06-04

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊puma的DefaultDataHandler

    puma/puma/src/main/java/com/dianping/puma/datahandler/DataHandler.java

    codecraft
  • leetcode之最长回文串

    这里先统计一下每个字符的个数,之后对于偶数个直接累加,对于奇数个先累加偶数部分,最后再判断结果是否是偶数,若是偶数则剩余的一个奇数可以算进去。

    codecraft
  • leetcode之最长回文串

    这里先统计一下每个字符的个数,之后对于偶数个直接累加,对于奇数个先累加偶数部分,最后再判断结果是否是偶数,若是偶数则剩余的一个奇数可以算进去。

    codecraft
  • 聊聊puma的DefaultDataHandler

    puma/puma/src/main/java/com/dianping/puma/datahandler/DataHandler.java

    codecraft
  • 2015.11.30 HTML5真题练习

    HTML5学堂:每天一道题,强壮程序员!今日主要涉及昨日题目的解答,以及一道涉及函数的形参实参、arguments对象的题目 HTML5真题【2015.11.2...

    HTML5学堂
  • 写一个 golang 风格的协程扩展

    Kotlin 的协程库 kotlinx.coroutines 当中有个比较常用的 async 函数,返回的 Deferred<T> 有个 await 方法,这个...

    bennyhuo
  • Typo3 CVE-2019-12747 反序列化漏洞分析

    TYPO3是一个以PHP编写、采用GNU通用公共许可证的自由、开源的内容管理系统。

    知道创宇云安全
  • Typo3 CVE-2019-12747 反序列化漏洞分析

    TYPO3是一个以PHP编写、采用GNU通用公共许可证的自由、开源的内容管理系统。

    Seebug漏洞平台
  • Laravel框架实现抢红包功能示例

    可以在信息界面自行选择 抢红包 或者 发红包 1.发红包,跳转到相应的发红包界面

    砸漏
  • python列表与元组的用法

    7.列表生成式   #[i*i for i in range(10)]       [i*i for i in range(10) if i>5]

    py3study

扫码关注云+社区

领取腾讯云代金券