前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink connecton for gbase8c

Flink connecton for gbase8c

作者头像
麒思妙想
发布2022-11-11 16:24:01
4100
发布2022-11-11 16:24:01
举报
文章被收录于专栏:麒思妙想麒思妙想

上一次发文,好像还是上一次,鸽了这么久,开始还是有一些心理负担的,但是时间长了,好像就坦然了一些,但问题终究还是要面对的,所以今天我来了。。。

因为一些原因,研究的方向有了一些变动,目前以分布式集群架构,以及编译器为主。相信关注我一段时间的朋友,也发现了 JimSQL 最近也做了一次大更新,有兴趣的同学,可以一起交流。

好了,今天我们来分享,手把手构建 Flink connector GBase8c , 各位读者老爷们扣Q上车... Let's go !!!

GBase8c

GBase 8c,是南大通用分布式交易型数据库管理系统的简称,是一款 shared nothing架构的分布式交易型数据库集群。 具备高性能、高可用、弹性伸缩、高安全性等特性,可以部署在物理机、虚拟机、容器、私有云和公有云,为金融核心系统、互联网业务系统和政企业务系统提供安全、稳定、可靠的数据存储和管理服务。

技术特点

作为一款金融级分布式交易型数据库产品,GBase 8c具有强一致性的全局事务、计算存储分离、灵活的数据分布、灵活的部署方式、在线扩容缩容、在线升级、数据高可用、高安全性、异地多活、数据高效加载、集群备份恢复、易维护、标准化、兼容国产生态等技术特征。

强一致性全局事务

GBase 8c采用两阶段提交协议和全局事务号来保证全局事务的强一致性,每个跨节点的事务,要么全部成功,要么全部失败,不会出现某些节点事务成功,另外一些节点事务失败的情况,实现全局事务的强一致性。GBase 8c的事务处理具有自动容错能力,某个正在处理事务的节点发生故障后,新的节点会继续进行未完成的事务处理,而不需要应用程序重新请求。

计算存储分离

GBase 8c采用shared nothing架构,计算和存储分离。可以根据业务需求,对计算能力和存储能力分别进行水平扩展,达到降低总体拥有成本的目的。

灵活的数据分布

用户可以按照业务场景的需要,选择数据分布策略,从而在性能、可靠性和灵活性间获得最佳匹配。

GBase 8c支持复制表和分布表。复制表用于存储只读或者读多写少的数据,可以在本地进行和分布表的联合查询,从而大幅提升查询的性能。分布表用于存储单表规模较大的数据,通过Hash等方式分布到各个存储节点,降低单表数据量,提升数据读写性能。

配置GBase8c

代码语言:javascript
复制
# 创建用户
gbase=# create user jacky with password 'jacky';
ERROR:  Password must contain at least three kinds of characters.

# 需要符合密码规范
gbase=# create user jacky with password 'Jacky1288@';
NOTICE:  The encrypted password contains MD5 ciphertext, which is not secure.
CREATE ROLE

# 创建数据库
gbase=# create database test owner jacky;
CREATE DATABASE

# 授权
gbase=# grant all privileges on database test to jacky;
GRANT

gbase=# alter role jacky createdb;
ALTER ROLE

gbase=# grant all privileges to jacky;
ALTER ROLE

Flink connector GBase8c

我们其实是可以自己手写Sink将CDC的数据直接汇入我们的目标数据库的。这样是不是不够优雅?我们是不是可以通过Flink SQL的方式将数据汇入到GBase8c呢?答案是肯定的,接下来我们就来实现一个简单的GBase8c的Flink connector

  1. 构建 行转换器(RowConverter)
  2. 构建 方言(Dialect)
  3. 注册动态表工厂(DynamicTableFactory),以及相关Sink程序

经过上面三步,就可以实现一个简单的connector了。接下来我们就来看,如何实现:

构建 行转换器(RowConverter)

代码语言:javascript
复制
package name.lijiaqi.converter;

import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;

/**
 * @author lijiaqi
 */
public class GBase8cRowConverter extends AbstractJdbcRowConverter {

    public GBase8cRowConverter(RowType rowType) {
        super(rowType);
    }

    private static final long serialVersionUID = 1L;

    @Override
    public String converterName() {
        return "gbase8c";
    }

}

构建 方言(Dialect)

代码语言:javascript
复制
package name.lijiaqi.dialect;

import name.lijiaqi.converter.GBase8cRowConverter;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;

import java.util.Optional;

/**
 *
 * @author lijiaqi
 */
public class GBase8cDialect implements JdbcDialect {

    private static final long serialVersionUID = 1L;

    @Override
    public String dialectName() {
        return "gbase8c";
    }

    @Override
    public boolean canHandle(String url) {
        return url.startsWith("jdbc:opengauss:");
    }

    @Override
    public JdbcRowConverter getRowConverter(RowType rowType) {
        return new GBase8cRowConverter(rowType);
    }

    @Override
    public String getLimitClause(long l) {
        return null;
    }

    @Override
    public void validate(TableSchema schema) throws ValidationException {
        JdbcDialect.super.validate(schema);
    }

    @Override
    public Optional<String> defaultDriverName() {
        return Optional.of("org.opengauss.Driver");
    }

    @Override
    public String quoteIdentifier(String identifier) {
        return "'" + identifier + "'";
    }

    @Override
    public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        return JdbcDialect.super.getUpsertStatement(tableName, fieldNames, uniqueKeyFields);
    }

    @Override
    public String getRowExistsStatement(String tableName, String[] conditionFields) {
        return JdbcDialect.super.getRowExistsStatement(tableName, conditionFields);
    }

    @Override
    public String getInsertIntoStatement(String tableName, String[] fieldNames) {
        return JdbcDialect.super.getInsertIntoStatement(tableName, fieldNames);
    }

    @Override
    public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
        return JdbcDialect.super.getUpdateStatement(tableName, fieldNames, conditionFields);
    }

    @Override
    public String getDeleteStatement(String tableName, String[] conditionFields) {
        return JdbcDialect.super.getDeleteStatement(tableName, conditionFields);
    }

    @Override
    public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
        return JdbcDialect.super.getSelectFromStatement(tableName, selectFields, conditionFields);
    }

}

注册动态表工厂(DynamicTableFactory),以及相关Sink程序

首先创建 GBase8cSinkFunction 用于接受RowData数据输入,并将其Sink到配置的数据库中

代码语言:javascript
复制
package name.lijiaqi.table;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

/**
 * @author lijiaqi
 */
public class GBase8cDynamicTableSink implements DynamicTableSink {

    private final JdbcOptions jdbcOptions;
    private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
    private final DataType dataType;

    public GBase8cDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) {
        this.jdbcOptions = jdbcOptions;
        this.encodingFormat = encodingFormat;
        this.dataType = dataType;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return requestedMode;
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        System.out.println("SinkRuntimeProvider");
        System.out.println(dataType);

//        SerializationSchema<RowData> serializationSchema = encodingFormat.createRuntimeEncoder(context, dataType);
        GBase8cSinkFunction gbasedbtSinkFunction = new GBase8cSinkFunction(jdbcOptions,dataType);
        return SinkFunctionProvider.of(gbasedbtSinkFunction);
    }

    @Override
    public DynamicTableSink copy() {
        return new GBase8cDynamicTableSink(jdbcOptions, encodingFormat, dataType);
    }

    @Override
    public String asSummaryString() {
        return "GBase8c Table Sink";
    }

}

构建 GBase8cDynamicTableSink

代码语言:javascript
复制
package name.lijiaqi.table;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

/**
 * @author lijiaqi
 */
public class GBase8cDynamicTableSink implements DynamicTableSink {

    private final JdbcOptions jdbcOptions;
    private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
    private final DataType dataType;

    public GBase8cDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) {
        this.jdbcOptions = jdbcOptions;
        this.encodingFormat = encodingFormat;
        this.dataType = dataType;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return requestedMode;
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        System.out.println("SinkRuntimeProvider");
        System.out.println(dataType);

//        SerializationSchema<RowData> serializationSchema = encodingFormat.createRuntimeEncoder(context, dataType);
        GBase8cSinkFunction gbasedbtSinkFunction = new GBase8cSinkFunction(jdbcOptions,dataType);
        return SinkFunctionProvider.of(gbasedbtSinkFunction);
    }

    @Override
    public DynamicTableSink copy() {
        return new GBase8cDynamicTableSink(jdbcOptions, encodingFormat, dataType);
    }

    @Override
    public String asSummaryString() {
        return "GBase8c Table Sink";
    }

}

构建GBase8cDynamicTableFactory

代码语言:javascript
复制
package name.lijiaqi.table;


import name.lijiaqi.dialect.GBase8cDialect;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;

import java.util.HashSet;
import java.util.Set;

/**
 * @author lijiaqi
 */
public class GBase8cDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

    public static final String IDENTIFIER = "gbase8c";

    private static final String DRIVER_NAME = "org.opengauss.Driver";

    public static final ConfigOption<String> URL = ConfigOptions
            .key("url")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc database url.");

    public static final ConfigOption<String> DRIVER = ConfigOptions
            .key("driver")
            .stringType()
            .defaultValue(DRIVER_NAME)
            .withDescription("the jdbc driver.");



    public static final ConfigOption<String> TABLE_NAME = ConfigOptions
            .key("table-name")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc table name.");

    public static final ConfigOption<String> USERNAME = ConfigOptions
            .key("username")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc user name.");

    public static final ConfigOption<String> PASSWORD = ConfigOptions
            .key("password")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc password.");

//    public static final ConfigOption<String> FORMAT = ConfigOptions
//            .key("format")
//            .stringType()
//            .noDefaultValue()
//            .withDescription("the format.");

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
        requiredOptions.add(URL);
        requiredOptions.add(TABLE_NAME);
        requiredOptions.add(USERNAME);
        requiredOptions.add(PASSWORD);
//        requiredOptions.add(FORMAT);
        return requiredOptions;
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet<>();
    }

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {

        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        final ReadableConfig config = helper.getOptions();

        helper.validate();

        JdbcOptions jdbcOptions = getJdbcOptions(config);

        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

        return new GBase8cDynamicTableSource(jdbcOptions, physicalSchema);

    }

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {

        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

//        final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
//                SerializationFormatFactory.class,
//                FactoryUtil.FORMAT);

        final ReadableConfig config = helper.getOptions();

        // validate all options
        helper.validate();

        // get the validated options
        JdbcOptions jdbcOptions = getJdbcOptions(config);

        // derive the produced data type (excluding computed columns) from the catalog table
        final DataType dataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();

        // table sink
        return new GBase8cDynamicTableSink(jdbcOptions, null, dataType);
    }

    private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
        final String url = readableConfig.get(URL);
        final JdbcOptions.Builder builder = JdbcOptions.builder()
                .setDriverName(DRIVER_NAME)
                .setDBUrl(url)
                .setTableName(readableConfig.get(TABLE_NAME))
                .setDialect(new GBase8cDialect());

        readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
        readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
        return builder.build();
    }

}

接下来通过SPI注册动态表:创建文件resources\META-INF\services\org.apache.flink.table.factories.Factory内容注册为name.lijiaqi.table.GBase8cDynamicTableFactory

至此,我们的Flink connector 就构建完成,接下来,我们要使用其,来完成一个真正的项目。

CDC实战

下面是项目的整体架构图,我们通过flink cdc 从mysql获取变更数据,然后通过 flink sql 将数据 sink 到 GBase8c里

接下来,我们看一下如何通过Flink SQL实现CDC ,只需3条SQL语句即可。

创建数据源表

代码语言:javascript
复制
       // 数据源表
       String sourceDDL =
               "CREATE TABLE mysql_binlog (\n" +
                        " id INT NOT NULL,\n" +
                        " name STRING,\n" +
                        " memo STRING,\n" +
                        " PRIMARY KEY (id) NOT ENFORCED" +
                        ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'hostname' = 'localhost',\n" +
                        " 'port' = '3306',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = 'dafei1288',\n" +
                        " 'database-name' = 'cdc',\n" +
                        " 'table-name' = 't1'\n" +
                        ")";

创建输出表,输出到opengauss ,这里 connector设置成opengauss

代码语言:javascript
复制
        String url = "jdbc:postgresql://127.0.0.1:5432/test";
        String userName = "jacky";
        String password = "Jacky1288@";
        String gbasedbtSinkTable = "t1";
        // 输出目标表
        String sinkDDL =
                "CREATE TABLE test_cdc_sink (\n" +
                        " id INT NOT NULL,\n" +
                        " name STRING,\n" +
                        " memo STRING,\n" +
                        " PRIMARY KEY (id) NOT ENFORCED \n " +
                        ") WITH (\n" +
                        " 'connector' = 'gbase8c',\n" +
//                       " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +
                        " 'url' = '" + url + "',\n" +
                        " 'username' = '" + userName + "',\n" +
                        " 'password' = '" + password + "',\n" +
                        " 'table-name' = '" + gbasedbtSinkTable + "' \n" +
                        ")";

这里我们直接将数据汇入

代码语言:javascript
复制
       String transformSQL =
               "insert into test_cdc_sink select * from mysql_binlog";

完整参考代码

代码语言:javascript
复制
package name.lijiaqi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class MysqlToGBase8cMain {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);



        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);


        // 数据源表
        String sourceDDL =
                "CREATE TABLE mysql_binlog (\n" +
                        " id INT NOT NULL,\n" +
                        " name STRING,\n" +
                        " memo STRING,\n" +
                        " PRIMARY KEY (id) NOT ENFORCED" +
                        ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'hostname' = 'localhost',\n" +
                        " 'port' = '3306',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = 'dafei1288',\n" +
                        " 'database-name' = 'cdc',\n" +
                        " 'table-name' = 't1'\n" +
                        ")";


        String url = "jdbc:postgresql://127.0.0.1:5432/test";
        String userName = "jacky";
        String password = "Jacky1288@";
        String gbasedbtSinkTable = "t1";
        // 输出目标表
        String sinkDDL =
                "CREATE TABLE test_cdc_sink (\n" +
                        " id INT NOT NULL,\n" +
                        " name STRING,\n" +
                        " memo STRING,\n" +
                        " PRIMARY KEY (id) NOT ENFORCED \n " +
                        ") WITH (\n" +
                        " 'connector' = 'gbase8c',\n" +
//                       " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +
                        " 'url' = '" + url + "',\n" +
                        " 'username' = '" + userName + "',\n" +
                        " 'password' = '" + password + "',\n" +
                        " 'table-name' = '" + gbasedbtSinkTable + "' \n" +
                        ")";

        String transformSQL =
                "insert into test_cdc_sink select * from mysql_binlog";

        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDDL);
        TableResult result = tableEnv.executeSql(transformSQL);

        result.print();
        env.execute("sync-flink-cdc");
    }

}


运行结果

工程地址如下 , 欢迎start,for,pull requests

https://github.com/dafei1288/flink-connector-gbase8c

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-07-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 麒思妙想 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • GBase8c
  • 配置GBase8c
  • Flink connector GBase8c
    • 构建 行转换器(RowConverter)
      • 构建 方言(Dialect)
        • 注册动态表工厂(DynamicTableFactory),以及相关Sink程序
        • CDC实战
        相关产品与服务
        对象存储
        对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档