概述
基于原生表(TC-Iceberg)可以构建完整的近实时湖仓场景,包括 CDC 数据近实时入湖与近实时 Pipeline 构建。
前置条件
正确开通 DLC,已完成用户权限配置,开通托管存储。
正确配置 DLC 数据库数据优化,详细配置请参考开启数据优化。
正确配置 DLC 开启托管存储外部访问。
外部访问绑定 Oceanus/EMR 计算资源使用的 VPC。
准备 MySQL 数据库,建议购买腾讯云云数据库 MySQL,详细流程参考腾讯云数据库 MySQL 购买方式。
创建具有 SELECT、REPLICATION SLAVE 和 REPLICATION CLIENT 权限的数据库账号。
配置 MySQL 服务器开启 binlog,并将 binlog 格式符配置为 ROW、将 binlog_row_image 配置格式为 FULL。
操作流程
本篇实践将演示通过两个实时任务展示一个完整的近实时湖仓的构建过程,第一个任务会同步 MySQL 的数据到一张 TC-Iceberg 表中,第二个任务读取第一张 TC-Iceberg 表中的数据经过聚合写入第二张 TC-Iceberg 表中,实时任务的开发可以使用 Oceanus 流计算平台或者自建 Flink,具体的操作流程如下:

创建数据库表
登录 MySQL 数据库,执行下面的 SQL 初始化源端数据库表:
CREATE DATABASE `cdc_database`;CREATE TABLE `cdc_database`.`cdc_source`(`id` BIGINT, `class` VARCHAR(128), `score` INT, PRIMARY KEY(`id`));
登录 DLC,在数据探索界面,执行下面的 SQL 创建目标数据库表:
CREATE DATABASE cdc_database;CREATE TABLE cdc_database.cdc_sink(id LONG, class STRING, score INT, PRIMARY KEY(id)) using tc_iceberg;CREATE TABLE cdc_database.cdc_compute(class STRING, avg_score INT, PRIMARY KEY(class)) using tc_iceberg;
上传任务依赖
使用 Oceanus 流计算平台
1. 下载依赖:
TC-Iceberg Flink 插件:amoro-format-mixed-flink-runtime-1.16-0.8.jar。
Hive 库:hive-exec-2.3.9.jar。
元数据加速桶相关依赖:chdfs_hadoop_plugin_network-2.8.jar。
COS 访问配置 hdfs-site.xml,参考文末的 示例1。
2. 上传依赖到 Oceanus 依赖管理。
使用自建 Flink
1. 下载依赖:
TC-Iceberg Flink 插件:amoro-format-mixed-flink-runtime-1.16-0.8.jar。
COS 访问配置 hdfs-site.xml,参考文末的 示例1。
2. 登录 Flink 集群,将准备好的 Jar 上传到 flink/ib 目录下。
编写任务
使用 Oceanus 流计算平台
1. 在流计算 Oceanus 平台创建两个 Flink SQL 作业,打开作业参数进行如下配置:
添加上面上传的所有依赖。
选择 Flink 版本为 Flink-1.16。
打开 Checkpoint, 时间间隔调整为60秒。
2. 将作业内容分别替换为下面的 CDC 近实时同步 SQL 和 实时聚合 SQL。
使用自建 Flink
1. 通过 IntelliJ IDEA 新建一个名称为“flink-demo”的 Maven 项目。
2. 在 pom 中添加相关依赖,依赖详情请参考 示例2。
3. Java 同步代码:核心代码如下步骤展示,详细代码请参考 示例3。
添加两个入口类,将待执行 SQL 分别替换为下面的 CDC 近实时同步 SQL 和 实时聚合 SQL。
4. 通过 IntelliJ IDEA 对 flink-demo 项目编译打包,在项目 target 文件夹下生成 JAR 包 flink-demo-1.0-SNAPSHOT.jar。
CDC 近实时同步 SQL
CREATE CATALOG tc_iceberg_catalog WITH ('type'='mixed_iceberg','catalog-type'='hive','uri'='thrift://xxx:xxx', -- 填写 DLC 外部访问暴露的 Catalog 访问地址'table-formats'='MIXED_ICEBERG');CREATE TABLE `mysql_cdc_source` (`id` BIGINT,`class` STRING,`score` INT,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = 'xxx', -- 数据库的 IP'port' = 'xxx', -- 数据库的访问端口'username' = 'xxx', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'xxx', -- 数据库访问的密码'database-name' = 'cdc_database', -- 需要同步的数据库'table-name' = 'cdc_source' -- 需要同步的数据表名);INSERT INTO `tc_iceberg_catalog`.`cdc_database`.`cdc_sink` SELECT * FROM `mysql_cdc_source`;
实时聚合 SQL
CREATE CATALOG tc_iceberg_catalog WITH ('type'='mixed_iceberg','catalog-type'='hive','uri'='thrift://xxx:xxx', -- 填写 DLC 外部访问暴露的 Catalog 访问地址'table-formats'='MIXED_ICEBERG');INSERT INTO `tc_iceberg_catalog`.`cdc_database`.`cdc_compute`SELECT class, avg(score) AS avg_scoreFROM `tc_iceberg_catalog`.`cdc_database`.`cdc_source` GROUP BY `class`;
启动任务
使用 Oceanus 流计算平台
保存同步作业,发布草稿,等待作业启动成功,前往 Flink UI 确定任务状态正常。
使用自建 Flink
1. 登录 Flink 集群其中的一个实例,上传 flink-demo-1.0-SNAPSHOT.jar 到 /data/jars/ 目录(没有目录则新建)。
2. 登录 Flink 集群其中的一个实例,在 flink/bin 目录下执行如下命提交同步任务。
./flink run --class com.tencent.dlc.tciceberg.flink.FlinkSQLDemo /data/jars/flink-demo-1.0-SNAPSHOT.jar
验证数据
1. 登录 MySQL 数据库插入测试数据:
INSERT INTO `cdc_database`.`cdc_source` VALUES(1, 'class1', 80);INSERT INTO `cdc_database`.`cdc_source` VALUES(2, 'class1', 85);INSERT INTO `cdc_database`.`cdc_source` VALUES(3, 'class2', 85);INSERT INTO `cdc_database`.`cdc_source` VALUES(4, 'class2', 90);DELETE FROM `cdc_database`.`cdc_source` WHERE id = 1;UPDATE `cdc_database`.`cdc_source` SET `score` = 100 where id = 3;
2. 登录 DLC 控制台,单击数据探索,通过下面的 SQL 查询目标表数据:
SELECT * FROM cdc_database.cdc_sink;SELECT * FROM cdc_database.cdc_compute;
完整样例代码参考示例
示例1
访问 TC-Iceberg 托管存储需要的 hdfs-site.xml 配置:
<?xml version="1.0"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><property><name>fs.lakefs.impl</name><value>org.apache.hadoop.fs.lakefs.CosFileSystem</value></property><property><name>fs.cosn.impl</name><value>org.apache.hadoop.fs.CosFileSystem</value></property><!-- 配置正确的可用域 --><property><name>fs.cosn.bucket.region</name><value>ap-xxx</value></property><property><name>fs.cosn.posix_bucket.fs.impl</name><value>org.apache.hadoop.fs.CosFileSystem</value></property><property><name>fs.cosn.credentials.provider</name><value>org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider</value></property><property><name>qcloud.dlc.endpoint</name><value>dlc.tencentcloudapi.com</value></property><property><name>fs.cosn.posix_bucket.fs.userinfo.region</name><value>org.apache.hadoop.fs.auth.DlcCloudCredentialsProvider</value></property><!-- 配置用户的 Secret ID --><property><name>fs.cosn.posix_bucket.fs.userinfo.secretId</name><value>xxx</value></property><!-- 配置用户的 Secret KEY --><property><name>fs.cosn.posix_bucket.fs.userinfo.secretKey</name><value>xxx</value></property></configuration>
示例2
Flink Demo 任务依赖 pom.xml。
<properties><flink.version>1.16.3</flink.version></properties><dependencies<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency></dependencies>
示例3
Flink SQL 代码示例。
public class FlinkSQLDemo {public static void main(String[] args) {// 创建执行环境 和 配置checkpointStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(60000);env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");env.getCheckpointConfig().setCheckpointTimeout(60000);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 替换为要执行的SQLString sql = "SQL to be excuted...";tEnv.executeSql(sourceSql);}}