前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊nifi的AbstractBinlogTableEventWriter

聊聊nifi的AbstractBinlogTableEventWriter

原创
作者头像
code4it
修改2020-05-29 09:58:30
2630
修改2020-05-29 09:58:30
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下nifi的AbstractBinlogTableEventWriter

AbstractBinlogTableEventWriter

nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java

代码语言:javascript
复制
public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
​
    protected void writeJson(T event) throws IOException {
        super.writeJson(event);
        if (event.getDatabaseName() != null) {
            jsonGenerator.writeStringField("database", event.getDatabaseName());
        } else {
            jsonGenerator.writeNullField("database");
        }
        if (event.getTableName() != null) {
            jsonGenerator.writeStringField("table_name", event.getTableName());
        } else {
            jsonGenerator.writeNullField("table_name");
        }
        if (event.getTableId() != null) {
            jsonGenerator.writeNumberField("table_id", event.getTableId());
        } else {
            jsonGenerator.writeNullField("table_id");
        }
    }
​
    // Default implementation for table-related binlog events
    @Override
    public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {
        FlowFile flowFile = session.create();
        flowFile = session.write(flowFile, (outputStream) -> {
            super.startJson(outputStream, eventInfo);
            writeJson(eventInfo);
            // Nothing in the body
            super.endJson();
        });
        flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
        session.transfer(flowFile, relationship);
        session.getProvenanceReporter().receive(flowFile, transitUri);
        return currentSequenceId + 1;
    }
}
  • AbstractBinlogTableEventWriter继承了AbstractBinlogEventWriter,其泛型基类为BinlogTableEventInfo,它有四个子类,分别是DDLEventWriter、InsertRowsWriter、UpdateRowsWriter、DeleteRowsWriter

DDLEventWriter

nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java

代码语言:javascript
复制
public class DDLEventWriter extends AbstractBinlogTableEventWriter<DDLEventInfo> {
​
    @Override
    public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) {
        FlowFile flowFile = session.create();
        flowFile = session.write(flowFile, (outputStream) -> {
            super.startJson(outputStream, eventInfo);
            super.writeJson(eventInfo);
            jsonGenerator.writeStringField("query", eventInfo.getQuery());
            super.endJson();
        });
        flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
        session.transfer(flowFile, relationship);
        session.getProvenanceReporter().receive(flowFile, transitUri);
        return currentSequenceId + 1;
    }
}
  • DDLEventWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入DDLEventInfo

InsertRowsWriter

nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java

代码语言:javascript
复制
public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsEventInfo> {
​
    /**
     * Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set
     *
     * @param session   A reference to a ProcessSession from which the flow file(s) will be created and transferred
     * @param eventInfo An event whose value will become the contents of the flow file
     * @return The next available CDC sequence ID for use by the CDC processor
     */
    @Override
    public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
        final AtomicLong seqId = new AtomicLong(currentSequenceId);
        for (Serializable[] row : eventInfo.getRows()) {
​
            FlowFile flowFile = session.create();
            flowFile = session.write(flowFile, outputStream -> {
​
                super.startJson(outputStream, eventInfo);
                super.writeJson(eventInfo);
​
                final BitSet bitSet = eventInfo.getIncludedColumns();
                writeRow(eventInfo, row, bitSet);
​
                super.endJson();
            });
​
            flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
            session.transfer(flowFile, relationship);
            session.getProvenanceReporter().receive(flowFile, transitUri);
            seqId.getAndIncrement();
        }
        return seqId.get();
    }
​
    protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {
        jsonGenerator.writeArrayFieldStart("columns");
        int i = includedColumns.nextSetBit(0);
        while (i != -1) {
            jsonGenerator.writeStartObject();
            jsonGenerator.writeNumberField("id", i + 1);
            ColumnDefinition columnDefinition = event.getColumnByIndex(i);
            Integer columnType = null;
            if (columnDefinition != null) {
                jsonGenerator.writeStringField("name", columnDefinition.getName());
                columnType = columnDefinition.getType();
                jsonGenerator.writeNumberField("column_type", columnType);
            }
            if (row[i] == null) {
                jsonGenerator.writeNullField("value");
            } else {
                jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i]));
            }
            jsonGenerator.writeEndObject();
            i = includedColumns.nextSetBit(i + 1);
        }
        jsonGenerator.writeEndArray();
    }
}
  • InsertRowsWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入InsertRowsEventInfo

UpdateRowsWriter

nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java

代码语言:javascript
复制
public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsEventInfo> {
​
    /**
     * Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set
     *
     * @param session   A reference to a ProcessSession from which the flow file(s) will be created and transferred
     * @param eventInfo An event whose value will become the contents of the flow file
     * @return The next available CDC sequence ID for use by the CDC processor
     */
    @Override
    public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
        final AtomicLong seqId = new AtomicLong(currentSequenceId);
        for (Map.Entry<Serializable[], Serializable[]> row : eventInfo.getRows()) {
​
            FlowFile flowFile = session.create();
            flowFile = session.write(flowFile, outputStream -> {
​
                super.startJson(outputStream, eventInfo);
                super.writeJson(eventInfo);
​
                final BitSet bitSet = eventInfo.getIncludedColumns();
                writeRow(eventInfo, row, bitSet);
​
                super.endJson();
            });
​
            flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
            session.transfer(flowFile, relationship);
            session.getProvenanceReporter().receive(flowFile, transitUri);
            seqId.getAndIncrement();
        }
        return seqId.get();
    }
​
    protected void writeRow(UpdateRowsEventInfo event, Map.Entry<Serializable[], Serializable[]> row, BitSet includedColumns) throws IOException {
​
        jsonGenerator.writeArrayFieldStart("columns");
        int i = includedColumns.nextSetBit(0);
        while (i != -1) {
            jsonGenerator.writeStartObject();
            jsonGenerator.writeNumberField("id", i + 1);
            ColumnDefinition columnDefinition = event.getColumnByIndex(i);
            Integer columnType = null;
            if (columnDefinition != null) {
                jsonGenerator.writeStringField("name", columnDefinition.getName());
                columnType = columnDefinition.getType();
                jsonGenerator.writeNumberField("column_type", columnType);
            }
            Serializable[] oldRow = row.getKey();
            Serializable[] newRow = row.getValue();
​
            if (oldRow[i] == null) {
                jsonGenerator.writeNullField("last_value");
            } else {
                jsonGenerator.writeObjectField("last_value", MySQLCDCUtils.getWritableObject(columnType, oldRow[i]));
            }
​
            if (newRow[i] == null) {
                jsonGenerator.writeNullField("value");
            } else {
                jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, newRow[i]));
            }
            jsonGenerator.writeEndObject();
            i = includedColumns.nextSetBit(i + 1);
        }
        jsonGenerator.writeEndArray();
    }
}
  • UpdateRowsWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入UpdateRowsEventInfo

DeleteRowsWriter

nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java

代码语言:javascript
复制
public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsEventInfo> {
​
    /**
     * Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set
     *
     * @param session   A reference to a ProcessSession from which the flow file(s) will be created and transferred
     * @param eventInfo An event whose value will become the contents of the flow file
     * @return The next available CDC sequence ID for use by the CDC processor
     */
    @Override
    public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
        final AtomicLong seqId = new AtomicLong(currentSequenceId);
        for (Serializable[] row : eventInfo.getRows()) {
​
            FlowFile flowFile = session.create();
            flowFile = session.write(flowFile, outputStream -> {
​
                super.startJson(outputStream, eventInfo);
                super.writeJson(eventInfo);
​
                final BitSet bitSet = eventInfo.getIncludedColumns();
                writeRow(eventInfo, row, bitSet);
​
                super.endJson();
            });
​
            flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
            session.transfer(flowFile, relationship);
            session.getProvenanceReporter().receive(flowFile, transitUri);
            seqId.getAndIncrement();
        }
        return seqId.get();
    }
​
    protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {
        jsonGenerator.writeArrayFieldStart("columns");
        int i = includedColumns.nextSetBit(0);
        while (i != -1) {
            jsonGenerator.writeStartObject();
            jsonGenerator.writeNumberField("id", i + 1);
            ColumnDefinition columnDefinition = event.getColumnByIndex(i);
            Integer columnType = null;
            if (columnDefinition != null) {
                jsonGenerator.writeStringField("name", columnDefinition.getName());
                columnType = columnDefinition.getType();
                jsonGenerator.writeNumberField("column_type", columnType);
            }
            if (row[i] == null) {
                jsonGenerator.writeNullField("value");
            } else {
                jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i]));
            }
            jsonGenerator.writeEndObject();
            i = includedColumns.nextSetBit(i + 1);
        }
        jsonGenerator.writeEndArray();
    }
}
  • DeleteRowsWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入DeleteRowsEventInfo

小结

AbstractBinlogTableEventWriter继承了AbstractBinlogEventWriter,其泛型基类为BinlogTableEventInfo,它有四个子类,分别是DDLEventWriter、InsertRowsWriter、UpdateRowsWriter、DeleteRowsWriter

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AbstractBinlogTableEventWriter
  • DDLEventWriter
  • InsertRowsWriter
  • UpdateRowsWriter
  • DeleteRowsWriter
  • 小结
  • doc
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档