文档中心>流计算 Oceanus>SQL 开发指南>整库同步(SQL)能力

整库同步(SQL)能力

最近更新时间:2023-11-15 10:37:11

我的收藏

适用场景

在实时湖仓的数据链路建设过程中,将传统关系型数据库(如 MySQL 等)中的整库数据低延迟、高吞吐的同步到下游的 OLAP 数据库(如 Doris、ClickHouse)或者归档到对应的文件系统中(如 HDFS、Hive 表),是一个非常普遍和强烈的需求。
Oceanus 提供了非常方便的 CDAS(CREATE DATABASE AS)SQL 语法,来支持将数据库中的整库数据全量导入、增量实时同步写入到用户指定的下游系统中。

语法说明

CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)] -- 指明写入目标库的参数
AS DATABASE <source_catalog>.<source_database> -- source_database 是被同步的源数据库
INCLUDING { ALL TABLES | TABLE 'table_name' }
-- INCLUDING ALL TABLES 表示同步数据库中的所有表
-- INCLUDING TABLE 'table' 表示同步数据库中特定的表,支持正则表达式,如 'order_.*';
-- 同步多张表时,可以写成 INCLUDING TABLE 'tableA|tableB|tableC'的格式
[EXCLUDING TABLE 'table_name']
-- EXCLUDING TABLE 'table' 表示不同步数据库中特定的表,支持正则表达式,如 'order_.*';
-- 排除多张表时,可以写成 EXCLUDING TABLE 'tableA|tableB|tableC'的格式
[/*+ `OPTIONS`('key1'='val1', 'key2'='val2', ... ) */]
-- (可选,指明读取source的参数,如指定source serverId的范围,解析debezium时间戳字段类型等)
参数说明:
参数
解释
target_database
待写入的目标数据库名
database_comment
待写入的数据库注释
WITH参数
指明写入目标库的参数,目前会被翻译成下游 sink 表的描述参数
<source_catalog>.<source_database>
声明源 catalog 中需要同步的数据库
INCLUDING ALL TABLES
同步源库中的所有表
INCLUDING TABLE
同步数据库中特定的表,支持正则表达式,如 'order_.*' ; 同步多张表时,可以写成 INCLUDING TABLE 'tableA|tableB'格式
EXCLUDING TABLE
表示不同步数据库中特定的表,支持正则表达式,如 'order_.*'; 排除多张表时,可以写成 EXCLUDING TABLE 'tableA|tableB'格式
OPTIONS
可选,指明读取 Source 时覆盖的参数,如指定 source serverId 的范围等
注意
第三行 [WITH (key1=val1, key2=val2, ..)] 指明写入目标库的参数中, value 值支持将 Source 表的表名进行变量替换,使用方法是使用占位符 $tableName。
如下整库同步到 Doris 的示例中,写入每一个 sink 的表中 $tableName 会替换为相对应 MySQL 库中源表表名。
create catalog my_mysql with(...);
create database if not exists sink_db
with (
'connector' = 'doris',
'table.identifier' = 'db1.$tableName_doris'
...
)
including all tables
/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- 声明解析timestamp字段的时区类型

状态无损的加表能力(仅限 MySQL 源表)

如果您的 MySQL 数据库有较多的新增表需求,希望 Flink 可以支持新增表的数据同步,且不影响现有各表的同步状态,那么您可以使用 SET 和 OPTIONS 选项来开启该功能(本示例还开启了MySQL CDC 的多 Source 复用自动拆分最后一个大分片等高级能力,请按需使用):
-- 必须设置 table.optimizer.deterministic-operator-uid-for-cdas=true 才可以启用无损加表能力
SET table.optimizer.deterministic-operator-uid-for-cdas=true;

-- 可选:开启多 Source 复用,降低资源占用,提升稳定性
SET table.optimizer.mysql-cdc-source.merge.enabled=true;

-- 请替换成实际的 CDAS 语句,但至少要保留 'scan.newly-added-table.enabled' = 'true' 的 OPTIONS 选项
create catalog my_mysql with(...);
create database if not exists sink_db
with (
...
)
including all tables
/*+ `OPTIONS`('scan.newly-added-table.enabled' = 'true','scan.lastchunk.optimize.enable' = 'true') */;
注意:
1. SET 参数、scan.newly-added-table.enabled 的 OPTIONS 需要在作业首次启动时就加上;否则需要丢弃现有状态,重新全量同步一次数据,后续才可无损加表。
2. 只有当作业启动时,才可以感知新增表的存在。因此每当有加表操作,请对作业做一个快照,然后从该快照恢复运行,此时新表才会开始同步。
3. 当表的数量多时,我们建议使用 SET table.optimizer.mysql-cdc-source.merge.enabled=true; 语句开启多 Source 复用能力,提升整体稳定性。
4. 当表有持续的大流量写入时,我们建议开启 'scan.lastchunk.optimize.enable' = 'true' 参数,以避免最后一个分片过大导致 TaskManager OOM。

MySQL-CDC 元数据字段读取

MySQL CDC connector 除支持提取物理表的字段外,还支持了元数据字段列表可以提取。整库同步功能中也提供了一个 options 配置参数,可以用来控制同步所需的元数据字段。
参数
解释
oceanus.source.include-metadata.fields
需要同步的 source 表的元字段,格式为 'table_name:table_name;meta.batch_id:batch_id', 元数据字段定义通过分号;分隔,每个元数据字段格式为 metadataColumn:alias, 第一部分为实际对应的元数据 column,第二部分为重命名后的值。
注意:元数据字段会按照声明的顺序,追加到源表之后。
使用实例:
SET table.optimizer.mysql-cdc-source.merge.enabled=true;

create catalog my_mysql with (
'type' = 'jdbc',
'default-database' = 'test',
'username' = 'xx',
'password' = 'xxx',
'base-url' = 'xxx'
);

create database if not exists print_sink_db
comment 'test_sink'
with (
'connector' = 'print',
'print-identifier' = '$tableName')
as database `my_mysql`.`test`
including all tables
/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai',
'oceanus.source.include-metadata.fields'='table_name:table_name;database_name:database_name;op_ts:op_ts;meta.table_name:meta_table_name;meta.database_name:meta_database_name;meta.op_ts:meta_op_ts;meta.op_type:meta_op_type;meta.batch_id:meta_batch_id;meta.is_ddl:meta_id_ddl;meta.mysql_type:meta_mysql_type;meta.update_before:meta_update_before;meta.pk_names:meta_pk_names;meta.sql:meta_sql;meta.sql_type:meta_sql_type;meta.ts:meta_ts')
*/;

使用方法

1. 首先注册 MySQL 的 Catalog,作为待同步的数据源表,示例如下:
create catalog my_mysql with (
'type' = 'jdbc',
'default-database' = 'test',
'username' = 'XXX',
'password' = 'XXX',
'base-url' = 'jdbc:mysql://ip:port'
-- 'jdbc.properties.tinyInt1isBit' = 'false' -- jdbc参数,是否把tinyInt识别为bool. 默认为true.
-- 如果表字段包含tinyint(1), 建议把 jdbc.properties.tinyInt1isBit 设置为false.
);
2. 对于不支持自动建表的下游系统,需要事先保证在下游系统中建立和上游 Mysql 表中一一对应的源表。
3. 使用整库同步语法指定需要同步的同步表,其中写入目标库的参数现在只支持填入下游 connector 的必要参数。
4. 可以通过 jdbc.properties.* 传入对应的 jdbc 参数。

同步到 Hudi 示例

SET table.optimizer.mysql-cdc-source.merge.enabled=true;

create catalog my_mysql with (
'type' = 'jdbc',
'default-database' = 'test',
'username' = 'root',
'password' = 'XXX',
'base-url' = 'jdbc:mysql://ip:port'
);

create database if not exists sink_db
comment 'test_sink'
with (
'connector' = 'hudi',
'path' = 'hdfs://namenode:8020/user/hive/warehouse/$tableName_mor'
) as database `my_mysql`.`trade_log`
including all tables
/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- 声明解析timestamp字段的时区类型

同步到 ClickHouse 示例

SET table.optimizer.mysql-cdc-source.merge.enabled=true;

create catalog my_mysql with ( 'type' = 'jdbc', 'default-database' = 'test', 'username' = 'root', 'password' = 'XXX', 'base-url' = 'jdbc:mysql://ip:port' );

create database if not exists sink_db comment 'test_sink' with ( 'connector' = 'clickhouse://172.11.11.11:8123', -- 如果ClickHouse集群未配置账号密码可以不指定 --'username' = 'root', -- ClickHouse集群用户名 --'password' = 'root', -- ClickHouse集群的密码 'database-name' = 'testdb', -- 数据写入目的数据库 'table-name' = 'test_table1', -- 数据写入目的数据表 'sink.batch-size' = '1000', 'table.collapsing.field' = 'Sign' ) as database my_mysql.test_db including all tables /*+ OPTIONS('server-time-zone' = 'Asia/Shanghai') */;

同步到 Doris 示例

SET table.optimizer.mysql-cdc-source.merge.enabled=true;

create catalog my_mysql with (
'type' = 'jdbc',
'default-database' = 'test',
'username' = 'root',
'password' = 'XXX',
'base-url' = 'jdbc:mysql://ip:port'
);

create database if not exists sink_db
comment 'test_sink' with (
'connector' = 'doris',
'table.identifier' = 'trade_log.$tableName',
'username' = 'admin',
'password' = 'xxx',
'sink.batch.size' = '500',
'sink.batch.interval' = '1s',
'fenodes' = 'ip:port'
) as database `my_mysql`.`trade_log`
including all tables
/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- 声明解析timestamp字段的时区类型

同步到 Hive 示例

SET table.optimizer.mysql-cdc-source.merge.enabled=true;

create catalog my_mysql with (
'type' = 'jdbc',
'default-database' = 'test',
'username' = 'root',
'password' = 'XXX',
'base-url' = 'jdbc:mysql://ip:port'
);

create database if not exists sink_db
comment 'test_sink' with (
'connector' = 'hive',
'hive-version' = '2.3.6',
'hive-database' = 'test_100',
'hive-table' = '$tableName',
'sink.partition-commit.policy.kind' = 'metastore'
) as database `my_mysql`.`trade_log`
including all tables
/*+ `OPTIONS`('append-mode' = 'true', 'server-time-zone' = 'Asia/Shanghai') */;
-- 因为hive sink不支持变更数据,此处的hint会把原始cdc的变更数据转成成append流下发
作业运行拓扑图展开后如下:



同步到 Hive 自动建表示例:

作业需要提前配置连接 Hive 服务的 jar 包,详细配置可以参考 获取 Hive 连接配置 jar 包
SET table.optimizer.mysql-cdc-source.merge.enabled=true;

-- 注册mysql的catalog
create catalog my_mysql with (
'type' = 'jdbc',
'default-database' = 'test',
'username' = 'root',
'password' = 'XXX',
'base-url' = 'jdbc:mysql://ip:port'
);

-- 注册hive端catalog
create catalog my_hive with (
'type' = 'hive',
'default-database' = 'default',
'hive-version' = '2.3.5'
);

create database if not exists `my_hive`.`trade_log`
as database `my_mysql`.`trade_log`
including all tables
/*+ `OPTIONS`('append-mode' = 'true', 'server-time-zone' = 'Asia/Shanghai') */;
-- 因为hive sink不支持变更数据,此处的hint会把原始cdc的变更数据转成成append流下发

同步到 DLC 示例

SET table.optimizer.mysql-cdc-source.merge.enabled = true;
SET table.optimizer.deterministic-operator-uid-for-cdas=true;

-- 注册mysql的catalog
create catalog `my_mysql` with (
'type' = 'jdbc',
'default-database' = 'testdb', -- 数据库名
'username' = '${username}', -- mysql用户名
'password' = '${password}', -- mysql密码
'base-url' = 'jdbc:mysql://ip:3306'
);

-- 整库同步
create database if not exists `my_dlc_database`
comment 'test db sync'
with (
'connector' = 'iceberg-inlong', -- 固定值
'catalog-database' = 'test', -- DLC内表所在的数据库名称
'catalog-table' = 'my_$tableName', -- DLC内表名称,$tableName会自动替换为要同步的表名,DLC表需要提前创建
'default-database' = 'test', -- DLC内表所在的数据库名称
'catalog-name' = 'HYBRIS', -- 固定值
'catalog-impl' = 'org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog', -- 固定值
'qcloud.dlc.managed.account.uid' = '100026378089', -- 固定值,DLC管理账号的uid
'qcloud.dlc.secret-id' = '${secret_id}', -- DLC 用户的secretId从https://console.cloud.tencent.com/cam/capi中获取
'qcloud.dlc.secret-key' = '${secret_key}', -- DLC 用户的secretKey,从https://console.cloud.tencent.com/cam/capi中获取
'qcloud.dlc.region' = 'ap-guangzhou', -- DLC 所在地域,必须填ap-地域格式
'qcloud.dlc.jdbc.url' = 'jdbc:dlc:dlc.internal.tencentcloudapi.com?task_type=SparkSQLTask&database_name=test&datasource_connection_name=DataLakeCatalog&region=ap-guangzhou&data_engine_name=${engine_name}', -- DLC jdbc接入url,格式见https://cloud.tencent.com/document/product/1342/61547
'uri' = 'dlc.internal.tencentcloudapi.com', -- 固定值
'user.appid' = '${appid}', -- DLC 用户的 appid
'request.identity.token' = '100026378089' -- 固定值,DLC内表接入的token
)
as database `my_mysql`.`test` including table 'user.*'
excluding table 'user_info_20230530|user_behavior_tmp'
/* +`OPTIONS`(
'server-time-zone' = 'Asia/Shanghai',
'scan.newly-added-table.enabled' = 'true',
'scan.lastchunk.optimize.enable' = 'true'
*/
;

使用提醒

1. Flink 1.11 版本不支持整库同步(SQL)能力。
2. 目前只支持同步 MySQL 类型数据库作为整库同步的源表。
3. 目前同步到目标端时,除 Iceberg、Elasticsearch、Hudi 和 Hive(需要提前注册 Hive Catalog)作为目标表外,其它的目标端还不支持自动建表,需要事先在目标端中建立和 Mysql 库中数据表对应的表结构。
4. 推荐搭配 MySQL CDC Source 复用功能开启,一起使用,可以降低对数据库的压力。
5. CDAS 语法没有限制下游输出的类型,理论上可以同步到任意的下游类型。
6. 当同步的表的数量非常多的时候,flink 生成的单个 task 的 name 会非常长,导致 metric 系统占用大量的内存,影响作业稳定性,Oceanus 针对这种情况引入了 pipeline.task-name-length 参数来限制 taskName 的长度,能极大的提高作业稳定性和日志可读性。(适用 Flink-1.13 和 Flink-1.14 版本)。 可以在作业的配置中生效:
set pipeline.task-name-length=80;