DLC 原生表入湖实践

最近更新时间:2023-11-28 16:42:01

我的收藏

应用场景

CDC(Change Data Capture)是变更数据捕获的缩写,可以将源数据库中的增量变更近似实时同步到其他数据库或应用程序。DLC 支持通过 CDC 技术将源数据库的增量变更同步到 DLC 原生表,完成源数据入湖。

前置条件

正确开通 DLC,已完成用户权限配置,开通托管存储。
正确创建 DLC 数据库。
正确配置 DLC 数据库数据优化,详细配置请参考开启数据优化

InLong 数据入湖

通过 DataInLong 可将源数据同步到 DLC,详情请参见 DLC 环境准备与数据库配置

Oceanus 流计算数据入湖

通过 Oceanus 可将源数据同步到 DLC,详情请参见 数据湖计算 DLC。

自建 Flink 数据入湖

通过 Flink 可将源数据同步到 DLC。本示例展示将源 Kafka 的数据同步到 DLC,完成数据入湖。

环境准备

依赖集群:Kafka 2.4.x,Flink 1.15.x, Hadoop3.x。
Kafka、Flink 集群建议购买 EMR 集群,详情请参见创建集群

整体操作过程

详细操作流程可参考如下图:



步骤1:上传依赖 Jar :上传同步所需的 Kakfa、DLC 连接 Jar 包和 Hadoop 相关依赖 Jar。
步骤2:创建 Kafka Topic:创建 Kafka 生产消费的 Topic。
步骤3:DLC 新建目标表:DLC 数据管理新建目标表。
步骤4:提交任务:Flink 集群下提交同步任务。
步骤5:发送消息数据和查询同步结果:Kafka 集群发送消息数据和 DLC 上查看数据同步结果。

步骤1:上传依赖 Jar

1. 下载依赖 Jar
相关依赖 Jar 建议上传与Flink对应版本的Jar,例如 Flink 为 Flink1.15.x,则建议下载 flink-sql-connect-kafka-1.15.x.jar。相关文件参考附件。
Kafka 相关依赖:flink-sql-connect-kafka-1.15.4.jar
2. 登录 Flink 集群,将准备好的 Jar 上传到 flink/ib 目录下。

步骤2:创建 Kafka Topic

登录 Kafka Manager,单击 default 集群,单击 Topic > Create
Topic 名称:本示例输入为 kafka_dlc
分区数:1
副本数:1



或者登录 Kafka 集群实例,在 kafka/bin 目录下使用如下命令创建 Topic。
./kafka-topics.sh --bootstrap-server ip:port --create --topic kafka-dlc

步骤3:DLC 新建目标表

新建目标表详情可参考 DLC 原生表操作配置

步骤4:提交任务

Flink 同步数据的方式有2种,Flink SQL写入模式 和 Flink Stream API,以下会介绍2种同步方式。
提交任务前,需要新建保存 checkpoint 数据的目录,通过如下命令新建数据目录。
新建 hdfs /flink/checkpoints 目录:
hadoop fs -mkdir /flink
hadoop fs -mkdir /flink/checkpoints
Flink SQL 同步模式
1. 通过 IntelliJ IDEA 新建一个名称为“flink-demo”的 Maven 项目。
2. 在 pom 中添加相关依赖,依赖详情请参考 完整样例代码参考 > 示例1
3. Java 同步代码:核心代码如下步骤展示,详细代码请参考 完整样例代码参考 > 示例2
创建执行环境和配置 checkpoint:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
执行 Source SQL:
tEnv.executeSql(sourceSql);
执行 同步 SQL:
tEnv.executeSql(sql)
4. 通过 IntelliJ IDEA 对 flink-demo 项目编译打包,在项目 target 文件夹下生成 JAR 包 flink-demo-1.0-SNAPSHOT.jar。
5. 登录 Flink 集群其中的一个实例,上传 flink-demo-1.0-SNAPSHOT.jar 到 /data/jars/ 目录(没有目录则新建)。
6. 登录 Flink 集群其中的一个实例,在 flink/bin 目录下执行如下命提交同步任务。
./flink run --class com.tencent.dlc.iceberg.flink.AppendIceberg /data/jars/flink-demo-1.0-SNAPSHOT.jar
Flink Stream API 同步模式
1. 通过 IntelliJ IDEA 新建一个名称为“flink-demo”的 Maven 项目。
2. 在 pom 中添加相关依赖:完整样例代码参考 > 示例3
3. Java 核心代码如下步骤展示,详细代码请参考 完整样例代码参考 > 示例4
创建执行环境 StreamTableEnvironment,配置 checkpoint:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointStorage("hdfs:///data/checkpoints");
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
获取 Kafka 输入流:
KafkaToDLC dlcSink = new KafkaToDLC();
DataStream<RowData> dataStreamSource = dlcSink.buildInputStream(env);
配置 Sink:
FlinkSink.forRowData(dataStreamSource)
.table(table)
.tableLoader(tableLoader)
.equalityFieldColumns(equalityColumns)
.metric(params.get(INLONG_METRIC.key()), params.get(INLONG_AUDIT.key()))
.action(actionsProvider)
.tableOptions(Configuration.fromMap(options))
//默认为false,追加数据。如果设置为true 就是覆盖数据
.overwrite(false)
.append();
执行同步 SQL:
env.execute("DataStream Api Write Data To Iceberg");
4. 通过 IntelliJ IDEA 对 flink-demo 项目编译打包,在项目 target 文件夹下生成 JAR 包 flink-demo-1.0-SNAPSHOT.jar。
5. 登录 Flink 集群其中的一个实例,上传 flink-demo-1.0-SNAPSHOT.jar 到 /data/jars/ 目录(没有目录则新建)。
6. 登录 Flink 集群其中的一个实例,在 flink/bin 目录下执行如下命令提交任务。
./flink run --class com.tencent.dlc.iceberg.flink.AppendIceberg /data/jars/flink-demo-1.0-SNAPSHOT.jar

步骤5:发送消息数据和查询同步结果

1. 登录 Kafka 集群实例,在 kafka/bin 目录 用如下命令,发送消息数据。
./kafka-console-producer.sh --broker-list 122.152.227.141:9092 --topic kafka-dlc
数据信息如下:
{"id":1,"name":"Zhangsan","age":18}
{"id":2,"name":"Lisi","age":19}
{"id":3,"name":"Wangwu","age":20}
{"id":4,"name":"Lily","age":21}
{"id":5,"name":"Lucy","age":22}
{"id":6,"name":"Huahua","age":23}
{"id":7,"name":"Wawa","age":24}
{"id":8,"name":"Mei","age":25}
{"id":9,"name":"Joi","age":26}
{"id":10,"name":"Qi","age":27}
{"id":11,"name":"Ky","age":28}
{"id":12,"name":"Mark","age":29}
2. 查询同步结果
打开 Flink Dashboard,单击 Running Job > 运行Job > Checkpoint > Overview,查看 Job 同步结果。

3. 登录 DLC 控制台,单击数据探索,查询目标表数据。


完整样例代码参考示例

说明:
示例中带“****”的数据请替换成开发中实际的数据。 SecretId,SecretKey 查询请参考主账号访问密钥管理

示例1

<properties>
<flink.version>1.15.4</flink.version>
<cos.lakefs.plugin.version>1.0</cos.lakefs.plugin.version>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.qcloud.cos</groupId>
<artifactId>lakefs-cloud-plugin</artifactId>
<version>${cos.lakefs.plugin.version}</version>
<exclusions>
<exclusion>
<groupId>com.tencentcloudapi</groupId>
<artifactId>tencentcloud-sdk-java</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

示例2

public class AppendIceberg {

public static void main(String[] args) {
// 创建执行环境 和 配置checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 创建输入表
String sourceSql = "CREATE TABLE tb_kafka_sr ( \\n"
+ " id INT, \\n"
+ " name STRING, \\n"
+ " age INT \\n"
+ ") WITH ( \\n"
+ " 'connector' = 'kafka', \\n"
+ " 'topic' = 'kafka_dlc', \\n"
+ " 'properties.bootstrap.servers' = '10.0.126.***:9092', \\n" // kafka 连接 ip 和 port
+ " 'properties.group.id' = 'test-group', \\n"
+ " 'scan.startup.mode' = 'earliest-offset', \\n" // 从可能的最早偏移量开始
+ " 'format' = 'json' \\n"
+ ");";
tEnv.executeSql(sourceSql);

// 创建输出表
String sinkSql = "CREATE TABLE tb_dlc_sk ( \\n"
+ " id INT PRIMARY KEY NOT ENFORCED, \\n"
+ " name STRING,\\n"
+ " age INT\\n"
+ ") WITH (\\n"
+ " 'qcloud.dlc.managed.account.uid' = '1000***79117',\\n" //用户Uid
+ " 'qcloud.dlc.secret-id' = 'AKIDwjQvBHCsKYXL3***pMdkeMsBH8lAJEt',\\n" // 用户SecretId
+ " 'qcloud.dlc.region' = 'ap-***',\\n" // 数据库表地域信息
+ " 'qcloud.dlc.user.appid' = 'AKIDwjQvBHCsKYXL3***pMdkeMsBH8lAJEt',\\n" // 用户SecretId
+ " 'qcloud.dlc.secret-key' = 'kFWYQ5WklaCYgbLtD***cyAD7sUyNiVP',\\n" // 用户 SecretKey
+ " 'connector' = 'iceberg-inlong', \\n"
+ " 'catalog-database' = 'test_***', \\n" // 目标数据库
+ " 'catalog-table' = 'kafka_dlc', \\n" // 目标数据表
+ " 'default-database' = 'test_***', \\n" //默认数据库
+ " 'catalog-name' = 'HYBRIS', \\n"
+ " 'catalog-impl' = 'org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog', \\n"
+ " 'uri' = 'dlc.tencentcloudapi.com', \\n"
+ " 'fs.cosn.credentials.provider' = 'org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider', \\n"
+ " 'qcloud.dlc.endpoint' = 'dlc.tencentcloudapi.com', \\n"
+ " 'fs.lakefs.impl' = 'org.apache.hadoop.fs.CosFileSystem', \\n"
+ " 'fs.cosn.impl' = 'org.apache.hadoop.fs.CosFileSystem', \\n"
+ " 'fs.cosn.userinfo.region' = 'ap-guangzhou', \\n" // 使用到的COS的地域信息
+ " 'fs.cosn.userinfo.secretId' = 'AKIDwjQvBHCsKYXL3***pMdkeMsBH8lAJEt', \\n" // 用户SecretId
+ " 'fs.cosn.userinfo.secretKey' = 'kFWYQ5WklaCYgbLtD***cyAD7sUyNiVP', \\n" // 用户 SecretKey
+ " 'service.endpoint' = 'dlc.tencentcloudapi.com', \\n"
+ " 'service.secret.id' = 'AKIDwjQvBHCsKYXL3***pMdkeMsBH8lAJEt', \\n" // 用户SecretId
+ " 'service.secret.key' = 'kFWYQ5WklaCYgbLtD***cyAD7sUyNiVP', \\n" // 用户 SecretKey
+ " 'service.region' = 'ap-***', \\n" // 数据库表地域信息
+ " 'user.appid' = '1305424723', \\n"
+ " 'request.identity.token' = '1000***79117', \\n"
+ " 'qcloud.dlc.jdbc.url'='jdbc:dlc:dlc.internal.tencentcloudapi.com?task_type=SparkSQLTask&database_name=test_***&datasource_connection_name=DataLakeCatalog&region=ap-***&data_engine_name=flink-***' \\n"
+ ");";
tEnv.executeSql(sinkSql);
// 执行计算并输出
String sql = "insert into tb_dlc_sk select * from tb_kafka_sr";
tEnv.executeSql(sql);
}

}

示例3

<properties>
<flink.version>1.15.4</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.22</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-iceberg-dlc</artifactId>
<version>1.6.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/sort-connector-iceberg-dlc-1.6.0.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>

示例4

public class KafkaToDLC {

public static void main(String[] args) throws Exception {
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
final Map<String, String> options = setOptions();
//1.执行环境 StreamTableEnvironment,配置checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointStorage("hdfs:///data/checkpoints");
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().setGlobalJobParameters(params);

//2.获取输入流
KafkaToDLC dlcSink = new KafkaToDLC();
DataStream<RowData> dataStreamSource = dlcSink.buildInputStream(env);

//3.创建Hadoop配置、Catalog配置
CatalogLoader catalogLoader = FlinkDynamicTableFactory.createCatalogLoader(options);
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader,
TableIdentifier.of(params.get(CATALOG_DATABASE.key()), params.get(CATALOG_TABLE.key())));
tableLoader.open();
Table table = tableLoader.loadTable();
ActionsProvider actionsProvider = FlinkDynamicTableFactory.createActionLoader(
Thread.currentThread().getContextClassLoader(), options);
//4.创建Schema
Schema schema = Schema.newBuilder()
.column("id", DataTypeUtils.toInternalDataType(new IntType(false)))
.column("name", DataTypeUtils.toInternalDataType(new VarCharType()))
.column("age", DataTypeUtils.toInternalDataType(new DateType(false)))
.primaryKey("id")
.build();
List<String> equalityColumns = schema.getPrimaryKey().get().getColumnNames();
//5.配置Slink
FlinkSink.forRowData(dataStreamSource)
//这个 .table 也可以不写,指定tableLoader 对应的路径就可以。
.table(table)
.tableLoader(tableLoader)
.equalityFieldColumns(equalityColumns)
.metric(params.get(INLONG_METRIC.key()), params.get(INLONG_AUDIT.key()))
.action(actionsProvider)
.tableOptions(Configuration.fromMap(options))
//默认为false,追加数据。如果设置为true 就是覆盖数据
.overwrite(false)
.append();
//6.执行同步
env.execute("DataStream Api Write Data To Iceberg");
}

private static Map<String, String> setOptions() {
Map<String, String> options = new HashMap<>();
options.put("qcloud.dlc.managed.account.uid", "1000***79117"); //用户Uid
options.put("qcloud.dlc.secret-id", "AKIDwjQvBHCsKYXL3***pMdkeMsBH8lAJEt"); // 用户SecretId
options.put("qcloud.dlc.region", "ap-***"); // 数据库表地域信息
options.put("qcloud.dlc.user.appid", "AKIDwjQvBHCsKYXL3***pMdkeMsBH8lAJEt"); // 用户SecretId
options.put("qcloud.dlc.secret-key", "kFWYQ5WklaCYgbLtD***cyAD7sUyNiVP"); // 用户 SecretKey
options.put("connector", "iceberg-inlong");
options.put("catalog-database", "test_***"); // 目标数据库
options.put("catalog-table", "kafka_dlc"); // 目标数据表
options.put("default-database", "test_***"); //默认数据库
options.put("catalog-name", "HYBRIS");
options.put("catalog-impl", "org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog");
options.put("uri", "dlc.tencentcloudapi.com");
options.put("fs.cosn.credentials.provider", "org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider");
options.put("qcloud.dlc.endpoint", "dlc.tencentcloudapi.com");
options.put("fs.lakefs.impl", "org.apache.hadoop.fs.CosFileSystem");
options.put("fs.cosn.impl", "org.apache.hadoop.fs.CosFileSystem");
options.put("fs.cosn.userinfo.region", "ap-guangzhou"); // 使用到的COS的地域信息
options.put("fs.cosn.userinfo.secretId", "AKIDwjQvBHCsKYXL3***pMdkeMsBH8lAJEt"); // 用户SecretId
options.put("fs.cosn.userinfo.secretKey", "kFWYQ5WklaCYgbLtD***cyAD7sUyNiVP"); // 用户 SecretKey
options.put("service.endpoint", "dlc.tencentcloudapi.com");
options.put("service.secret.id", "AKIDwjQvBHCsKYXL3***pMdkeMsBH8lAJEt"); // 用户SecretId
options.put("service.secret.key", "kFWYQ5WklaCYgbLtD***cyAD7sUyNiVP"); // 用户 SecretKey
options.put("service.region", "ap-***"); // 数据库表地域信息
options.put("user.appid", "1305***23");
options.put("request.identity.token", "1000***79117");
options.put("qcloud.dlc.jdbc.url",
"jdbc:dlc:dlc.internal.tencentcloudapi.com?task_type,SparkSQLTask&database_name,test_***&datasource_connection_name,DataLakeCatalog&region,ap-***&data_engine_name,flink-***");
return options;
}

/**
* 创建输入流
*
* @param env
* @return
*/
private DataStream<RowData> buildInputStream(StreamExecutionEnvironment env) {
//1.配置执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(env, settings);
org.apache.flink.table.api.Table table = null;
//2.执行SQL,获取数据输入流
try {
sTableEnv.executeSql(createTableSql()).print();
table = sTableEnv.sqlQuery(transformSql());
DataStream<Row> rowStream = sTableEnv.toChangelogStream(table);
DataStream<RowData> rowDataDataStream = rowStream.map(new MapFunction<Row, RowData>() {
@Override
public RowData map(Row rows) throws Exception {
GenericRowData rowData = new GenericRowData(3);
rowData.setField(0, rows.getField(0));
rowData.setField(1, (String) rows.getField(1));
rowData.setField(2, rows.getField(2));
return rowData;
}
});
return rowDataDataStream;
} catch (Exception e) {
throw new RuntimeException("kafka to dlc transform sql execute error.", e);
}
}

private String createTableSql() {
String tableSql = "CREATE TABLE tb_kafka_sr ( \\n"
+ " id INT, \\n"
+ " name STRING, \\n"
+ " age INT \\n"
+ ") WITH ( \\n"
+ " 'connector' = 'kafka', \\n"
+ " 'topic' = 'kafka_dlc', \\n"
+ " 'properties.bootstrap.servers' = '10.0.126.30:9092', \\n"
+ " 'properties.group.id' = 'test-group-10001', \\n"
+ " 'scan.startup.mode' = 'earliest-offset', \\n"
+ " 'format' = 'json' \\n"
+ ");";
return tableSql;
}

private String transformSql() {
String transformSQL = "select * from tb_kafka_sr";
return transformSQL;
}
}