前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的JDBCOutputFormat

聊聊flink的JDBCOutputFormat

原创
作者头像
code4it
发布2018-12-04 15:05:51
2.1K0
发布2018-12-04 15:05:51
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的JDBCOutputFormat

JDBCOutputFormat

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

代码语言:javascript
复制
/**
 * OutputFormat to write Rows into a JDBC database.
 * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
 *
 * @see Row
 * @see DriverManager
 */
public class JDBCOutputFormat extends RichOutputFormat<Row> {
    private static final long serialVersionUID = 1L;
    static final int DEFAULT_BATCH_INTERVAL = 5000;
​
    private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
​
    private String username;
    private String password;
    private String drivername;
    private String dbURL;
    private String query;
    private int batchInterval = DEFAULT_BATCH_INTERVAL;
​
    private Connection dbConn;
    private PreparedStatement upload;
​
    private int batchCount = 0;
​
    private int[] typesArray;
​
    public JDBCOutputFormat() {
    }
​
    @Override
    public void configure(Configuration parameters) {
    }
​
    /**
     * Connects to the target database and initializes the prepared statement.
     *
     * @param taskNumber The number of the parallel instance.
     * @throws IOException Thrown, if the output could not be opened due to an
     * I/O problem.
     */
    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        try {
            establishConnection();
            upload = dbConn.prepareStatement(query);
        } catch (SQLException sqe) {
            throw new IllegalArgumentException("open() failed.", sqe);
        } catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
        }
    }
​
    private void establishConnection() throws SQLException, ClassNotFoundException {
        Class.forName(drivername);
        if (username == null) {
            dbConn = DriverManager.getConnection(dbURL);
        } else {
            dbConn = DriverManager.getConnection(dbURL, username, password);
        }
    }
​
    /**
     * Adds a record to the prepared statement.
     *
     * <p>When this method is called, the output format is guaranteed to be opened.
     *
     * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
     * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
     *
     * @param row The records to add to the output.
     * @see PreparedStatement
     * @throws IOException Thrown, if the records could not be added due to an I/O problem.
     */
    @Override
    public void writeRecord(Row row) throws IOException {
​
        if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) {
            LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
        }
        try {
​
            if (typesArray == null) {
                // no types provided
                for (int index = 0; index < row.getArity(); index++) {
                    LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index));
                    upload.setObject(index + 1, row.getField(index));
                }
            } else {
                // types provided
                for (int index = 0; index < row.getArity(); index++) {
​
                    if (row.getField(index) == null) {
                        upload.setNull(index + 1, typesArray[index]);
                    } else {
                        // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
                        switch (typesArray[index]) {
                            case java.sql.Types.NULL:
                                upload.setNull(index + 1, typesArray[index]);
                                break;
                            case java.sql.Types.BOOLEAN:
                            case java.sql.Types.BIT:
                                upload.setBoolean(index + 1, (boolean) row.getField(index));
                                break;
                            case java.sql.Types.CHAR:
                            case java.sql.Types.NCHAR:
                            case java.sql.Types.VARCHAR:
                            case java.sql.Types.LONGVARCHAR:
                            case java.sql.Types.LONGNVARCHAR:
                                upload.setString(index + 1, (String) row.getField(index));
                                break;
                            case java.sql.Types.TINYINT:
                                upload.setByte(index + 1, (byte) row.getField(index));
                                break;
                            case java.sql.Types.SMALLINT:
                                upload.setShort(index + 1, (short) row.getField(index));
                                break;
                            case java.sql.Types.INTEGER:
                                upload.setInt(index + 1, (int) row.getField(index));
                                break;
                            case java.sql.Types.BIGINT:
                                upload.setLong(index + 1, (long) row.getField(index));
                                break;
                            case java.sql.Types.REAL:
                                upload.setFloat(index + 1, (float) row.getField(index));
                                break;
                            case java.sql.Types.FLOAT:
                            case java.sql.Types.DOUBLE:
                                upload.setDouble(index + 1, (double) row.getField(index));
                                break;
                            case java.sql.Types.DECIMAL:
                            case java.sql.Types.NUMERIC:
                                upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
                                break;
                            case java.sql.Types.DATE:
                                upload.setDate(index + 1, (java.sql.Date) row.getField(index));
                                break;
                            case java.sql.Types.TIME:
                                upload.setTime(index + 1, (java.sql.Time) row.getField(index));
                                break;
                            case java.sql.Types.TIMESTAMP:
                                upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
                                break;
                            case java.sql.Types.BINARY:
                            case java.sql.Types.VARBINARY:
                            case java.sql.Types.LONGVARBINARY:
                                upload.setBytes(index + 1, (byte[]) row.getField(index));
                                break;
                            default:
                                upload.setObject(index + 1, row.getField(index));
                                LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
                                    typesArray[index], index + 1, row.getField(index));
                                // case java.sql.Types.SQLXML
                                // case java.sql.Types.ARRAY:
                                // case java.sql.Types.JAVA_OBJECT:
                                // case java.sql.Types.BLOB:
                                // case java.sql.Types.CLOB:
                                // case java.sql.Types.NCLOB:
                                // case java.sql.Types.DATALINK:
                                // case java.sql.Types.DISTINCT:
                                // case java.sql.Types.OTHER:
                                // case java.sql.Types.REF:
                                // case java.sql.Types.ROWID:
                                // case java.sql.Types.STRUC
                        }
                    }
                }
            }
            upload.addBatch();
            batchCount++;
        } catch (SQLException e) {
            throw new RuntimeException("Preparation of JDBC statement failed.", e);
        }
​
        if (batchCount >= batchInterval) {
            // execute batch
            flush();
        }
    }
​
    void flush() {
        try {
            upload.executeBatch();
            batchCount = 0;
        } catch (SQLException e) {
            throw new RuntimeException("Execution of JDBC statement failed.", e);
        }
    }
​
    int[] getTypesArray() {
        return typesArray;
    }
​
    /**
     * Executes prepared statement and closes all resources of this instance.
     *
     * @throws IOException Thrown, if the input could not be closed properly.
     */
    @Override
    public void close() throws IOException {
        if (upload != null) {
            flush();
            // close the connection
            try {
                upload.close();
            } catch (SQLException e) {
                LOG.info("JDBC statement could not be closed: " + e.getMessage());
            } finally {
                upload = null;
            }
        }
​
        if (dbConn != null) {
            try {
                dbConn.close();
            } catch (SQLException se) {
                LOG.info("JDBC connection could not be closed: " + se.getMessage());
            } finally {
                dbConn = null;
            }
        }
    }
​
    public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
        return new JDBCOutputFormatBuilder();
    }
​
    //......
}
  • JDBCOutputFormat继承了RichOutputFormat,这里的泛型为org.apache.flink.types.Row
  • open的时候调用了establishConnection来加载驱动,初始化dbConn,然后调用dbConn.prepareStatement(query)来获取upload(PreparedStatement)
  • writeRecord方法先判断是否有提供typesArray,没有的话则使用setObject来设置值,有点话则根据对应的类型进行转换,这里支持了多种java.sql.Types里头的类型
  • writeRecord采取的是PreparedStatement.addBatch操作,当batchCount大于等于batchInterval(默认5000),会执行flush操作,也就是调用PreparedStatement.executeBatch方法,然后重置batchCount;为了以防数据没达到batchInterval而未能提交,在close的时候会再次执行flush操作,然后才关闭PreparedStatement、Connection
  • JDBCOutputFormat提供了一个JDBCOutputFormatBuilder,可以用来方便构建JDBCOutputFormat

Row

flink-core-1.7.0-sources.jar!/org/apache/flink/types/Row.java

代码语言:javascript
复制
/**
 * A Row can have arbitrary number of fields and contain a set of fields, which may all be
 * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's
 * type extraction mechanism can't extract correct field types. So that users should manually
 * tell Flink the type information via creating a {@link RowTypeInfo}.
 *
 * <p>
 * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
 * set fields by {@link #setField(int, Object)}.
 * <p>
 * Row is in principle serializable. However, it may contain non-serializable fields,
 * in which case serialization will fail.
 *
 */
@PublicEvolving
public class Row implements Serializable{
​
    private static final long serialVersionUID = 1L;
​
    /** The array to store actual values. */
    private final Object[] fields;
​
    /**
     * Create a new Row instance.
     * @param arity The number of fields in the Row
     */
    public Row(int arity) {
        this.fields = new Object[arity];
    }
​
    /**
     * Get the number of fields in the Row.
     * @return The number of fields in the Row.
     */
    public int getArity() {
        return fields.length;
    }
​
    /**
     * Gets the field at the specified position.
     * @param pos The position of the field, 0-based.
     * @return The field at the specified position.
     * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
     */
    public Object getField(int pos) {
        return fields[pos];
    }
​
    /**
     * Sets the field at the specified position.
     *
     * @param pos The position of the field, 0-based.
     * @param value The value to be assigned to the field at the specified position.
     * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
     */
    public void setField(int pos, Object value) {
        fields[pos] = value;
    }
​
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < fields.length; i++) {
            if (i > 0) {
                sb.append(",");
            }
            sb.append(StringUtils.arrayAwareToString(fields[i]));
        }
        return sb.toString();
    }
​
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
​
        Row row = (Row) o;
​
        return Arrays.deepEquals(fields, row.fields);
    }
​
    @Override
    public int hashCode() {
        return Arrays.deepHashCode(fields);
    }
​
    /**
     * Creates a new Row and assigns the given values to the Row's fields.
     * This is more convenient than using the constructor.
     *
     * <p>For example:
     *
     * <pre>
     *     Row.of("hello", true, 1L);}
     * </pre>
     * instead of
     * <pre>
     *     Row row = new Row(3);
     *     row.setField(0, "hello");
     *     row.setField(1, true);
     *     row.setField(2, 1L);
     * </pre>
     *
     */
    public static Row of(Object... values) {
        Row row = new Row(values.length);
        for (int i = 0; i < values.length; i++) {
            row.setField(i, values[i]);
        }
        return row;
    }
​
    /**
     * Creates a new Row which copied from another row.
     * This method does not perform a deep copy.
     *
     * @param row The row being copied.
     * @return The cloned new Row
     */
    public static Row copy(Row row) {
        final Row newRow = new Row(row.fields.length);
        System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);
        return newRow;
    }
​
    /**
     * Creates a new Row with projected fields from another row.
     * This method does not perform a deep copy.
     *
     * @param fields fields to be projected
     * @return the new projected Row
     */
    public static Row project(Row row, int[] fields) {
        final Row newRow = new Row(fields.length);
        for (int i = 0; i < fields.length; i++) {
            newRow.fields[i] = row.fields[fields[i]];
        }
        return newRow;
    }
}
  • Row是JDBCOutputFormat的writeRecord的类型,它里头使用Object数据来存取字段值,同时也提供了诸如of、copy、project等静态方法

JDBCOutputFormatBuilder

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

代码语言:javascript
复制
    /**
     * Builder for a {@link JDBCOutputFormat}.
     */
    public static class JDBCOutputFormatBuilder {
        private final JDBCOutputFormat format;
​
        protected JDBCOutputFormatBuilder() {
            this.format = new JDBCOutputFormat();
        }
​
        public JDBCOutputFormatBuilder setUsername(String username) {
            format.username = username;
            return this;
        }
​
        public JDBCOutputFormatBuilder setPassword(String password) {
            format.password = password;
            return this;
        }
​
        public JDBCOutputFormatBuilder setDrivername(String drivername) {
            format.drivername = drivername;
            return this;
        }
​
        public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
            format.dbURL = dbURL;
            return this;
        }
​
        public JDBCOutputFormatBuilder setQuery(String query) {
            format.query = query;
            return this;
        }
​
        public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
            format.batchInterval = batchInterval;
            return this;
        }
​
        public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
            format.typesArray = typesArray;
            return this;
        }
​
        /**
         * Finalizes the configuration and checks validity.
         *
         * @return Configured JDBCOutputFormat
         */
        public JDBCOutputFormat finish() {
            if (format.username == null) {
                LOG.info("Username was not supplied.");
            }
            if (format.password == null) {
                LOG.info("Password was not supplied.");
            }
            if (format.dbURL == null) {
                throw new IllegalArgumentException("No database URL supplied.");
            }
            if (format.query == null) {
                throw new IllegalArgumentException("No query supplied.");
            }
            if (format.drivername == null) {
                throw new IllegalArgumentException("No driver supplied.");
            }
​
            return format;
        }
    }
  • JDBCOutputFormatBuilder提供了对username、password、dbURL、query、drivername、batchInterval、typesArray这几个属性的builder方法

JDBCAppendTableSink

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java

代码语言:javascript
复制
/**
 * An at-least-once Table sink for JDBC.
 *
 * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if
 * checkpointing is enabled). However, one common use case is to run idempotent queries
 * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and
 * achieve exactly-once semantic.</p>
 */
public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
​
    private final JDBCOutputFormat outputFormat;
​
    private String[] fieldNames;
    private TypeInformation[] fieldTypes;
​
    JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
        this.outputFormat = outputFormat;
    }
​
    public static JDBCAppendTableSinkBuilder builder() {
        return new JDBCAppendTableSinkBuilder();
    }
​
    @Override
    public void emitDataStream(DataStream<Row> dataStream) {
        dataStream
                .addSink(new JDBCSinkFunction(outputFormat))
                .name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
    }
​
    @Override
    public void emitDataSet(DataSet<Row> dataSet) {
        dataSet.output(outputFormat);
    }
​
    @Override
    public TypeInformation<Row> getOutputType() {
        return new RowTypeInfo(fieldTypes, fieldNames);
    }
​
    @Override
    public String[] getFieldNames() {
        return fieldNames;
    }
​
    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return fieldTypes;
    }
​
    @Override
    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        int[] types = outputFormat.getTypesArray();
​
        String sinkSchema =
            String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
        String tableSchema =
            String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
        String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " +
            "Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema);
​
        Preconditions.checkArgument(fieldTypes.length == types.length, msg);
        for (int i = 0; i < types.length; ++i) {
            Preconditions.checkArgument(
                JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
                msg);
        }
​
        JDBCAppendTableSink copy;
        try {
            copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat));
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
​
        copy.fieldNames = fieldNames;
        copy.fieldTypes = fieldTypes;
        return copy;
    }
​
    @VisibleForTesting
    JDBCOutputFormat getOutputFormat() {
        return outputFormat;
    }
}
  • JDBCAppendTableSink里头用到了JDBCOutputFormat,它实现了AppendStreamTableSink以及BatchTableSink接口
  • 它的emitDataStream方法会给传入的dataStream设置JDBCSinkFunction的sink(JDBCSinkFunction);而emitDataSet方法则对dataSet设置output
  • 这里实现了TableSink(BatchTableSink声明实现TableSink)的getOutputType、getFieldNames、getFieldTypes、configure方法;configure方法这里主要是根据JDBCOutputFormat创建了JDBCAppendTableSink

JDBCSinkFunction

flink-jdbc_2.11-1.7.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java

代码语言:javascript
复制
class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
    final JDBCOutputFormat outputFormat;
​
    JDBCSinkFunction(JDBCOutputFormat outputFormat) {
        this.outputFormat = outputFormat;
    }
​
    @Override
    public void invoke(Row value) throws Exception {
        outputFormat.writeRecord(value);
    }
​
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        outputFormat.flush();
    }
​
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
    }
​
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RuntimeContext ctx = getRuntimeContext();
        outputFormat.setRuntimeContext(ctx);
        outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
    }
​
    @Override
    public void close() throws Exception {
        outputFormat.close();
        super.close();
    }
}
  • JDBCSinkFunction继承了RichSinkFunction,同时也实现了CheckpointedFunction接口;invoke方法使用的是JDBCOutputFormat.writeRecord方法,而snapshotState则是调用了JDBCOutputFormat.flush来及时提交记录

小结

  • JDBCOutputFormat继承了RichOutputFormat,open的时候调用了establishConnection来加载驱动,初始化dbConn,然后调用dbConn.prepareStatement(query)来获取upload(PreparedStatement);writeRecord采取的是PreparedStatement.addBatch操作,当batchCount大于等于batchInterval(默认5000),会执行flush操作,也就是调用PreparedStatement.executeBatch方法,然后重置batchCount;为了以防数据没达到batchInterval而未能提交,在close的时候会再次执行flush操作,然后才关闭PreparedStatement、Connection
  • Row是JDBCOutputFormat的writeRecord的类型,它里头使用Object数据来存取字段值
  • JDBCOutputFormatBuilder提供了对username、password、dbURL、query、drivername、batchInterval、typesArray这几个属性的builder方法
  • JDBCAppendTableSink里头用到了JDBCOutputFormat,它的emitDataStream方法会给传入的dataStream设置JDBCSinkFunction的sink(JDBCSinkFunction);而emitDataSet方法则对dataSet设置output
  • JDBCSinkFunction继承了RichSinkFunction,同时也实现了CheckpointedFunction接口;invoke方法使用的是JDBCOutputFormat.writeRecord方法,而snapshotState则是调用了JDBCOutputFormat.flush来及时提交记录

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • JDBCOutputFormat
    • Row
      • JDBCOutputFormatBuilder
      • JDBCAppendTableSink
      • JDBCSinkFunction
      • 小结
      • doc
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档