DLC 数据源

最近更新时间:2024-04-23 17:45:01

我的收藏

DLC 环境准备与数据库配置

使用限制

选择 upsert 模式同步时,DLC 表的表必须是 V2表,并且开启设置 write.upsert.enabled=true。
选择 upsert 模式同步时,必须指定主键,如果是一个分区表,分区字段也必须添加到主键中。




数据库环境准备

检查 WeData 是否有访问 DLC 的权限

1. 登录腾讯云控制台,进入 访问管理(CAM)页面。
2. 在左侧菜单中选择用户,找到您需要绑定策略的用户。
3. 单击该用户的名称,进入用户详情页。在权限栏搜索 QcloudWeDataExternalAccess,如果已经关联了该策略,下面步骤可以跳过。



4. 在用户详情页中,找到权限管理模块下的关联策略
5. 单击关联策略中的新建关联按钮。选择从策略列表中选取策略关联,并输入 QcloudWeDataExternalAccess
6. 勾选 QcloudWeDataExternalAccess,单击下一步按钮,完成策略绑定。



7. 绑定成功后,该用户即可使用 QcloudWeDataExternalAccess 策略中定义的权限。

检查表是 V1表还是 V2表

实时同步到 DLC,支持两种模式:Append 模式 Upsert 模式。其中 Upsert 模式必须为 V2表。使用下面命令查看创建的表属性。
SHOW TBLPROPERTIES `DataLakeCatalog`.`databases_name`.`table_name`;
返回结果如下,format-version = 2 表示创建的是 V2表。如果您创建的 V1表,想改成 V2表,可参考 修改表属性




创建 DLC 表

1. 创建 V1表
DLC 默认创建的表为 V1表,V1表不支持 Upsert 模式。建表语句参考下面:
CREATE TABLE IF NOT EXISTS `DataLakeCatalog`.`dbname`.`test_v1` (`id` int, `name` string, `ts` date) PARTITIONED BY (`ts`);
2. 创建 V2表
使用 upsert 模式的写入 DLC,需要创建 V2表,需要在建立表的时候指定,建表语句参考下面:
创建 V2表,也一定要设置 'write.upsert.enabled' = 'true' 否则仍然是 Append 模式。
CREATE TABLE IF NOT EXISTS `DataLakeCatalog`.`dbname`.`test_v2` (`id` int, `name` string, `ts` date) PARTITIONED BY (`ts`) TBLPROPERTIES (
'format-version' = '2', -- 创建 V2 表
'write.upsert.enabled' = 'true', -- 写入时做 upsert 操作,只支持 V2 表
'write.distribution-mode' = 'hash', -- 定义写入数据的分布,设置为hash,支持多并发写入
'write.update.mode' = 'merge-on-read' -- 写入更新模式,在写入的时候做merge操作,只支持 V2 表
)
3.
修改表属性(可选)

对于已经创建了的表,需要修改其属性,可以参考下面语法:
SHOW TBLPROPERTIES table_name [('property_name')]
更多 DLC SQL 语法,可以参考 SQL 语法概览
下面是将已经存在的 V1表改成 V2表的示例:
ALTER TABLE
`DataLakeCatalog`.`database_name`.`table_name`
SET
TBLPROPERTIES (
'format-version' = '2',
'write.upsert.enabled' = 'true'
)

数据源配置

WeData 目前支持通过连接串方式引入 DLC 数据源。
单击项目管理 > 数据源管理 > 新建数据源 > 选择 DLC 数据源。



参数说明如下:
参数
说明
数据源名称
新建数据源的标识名称,由用户自定义且不可为空。命名以字母开头,可包含字母、数字、下划线,长度在20字符以内。类似唯一标识,不支持修改
显示名
新建数据源的显示名称,由用户自定义且支持修改。显示名可选填,不填默认显示数据源名称。
描述
选填,对本数据源的描述
数据源权限
项目共享表示当前数据源项目所有成员均可使用 ,仅个人和管理员表示改数据源仅创建人和项目管理员可用
JDBC URL
用于连接 DLC 数据源的连接串信息
数据连通性
测试是否能够连通所配置的数据库
DLC 权限范围操作指南
1. 请在访问管理 > 用户 > 用户列表 页面中,单击新建用户并开始创建新用户(即子账号)。

2. 按创建用户页面流程创建用户:填写用户名、选择标签键与标签值,即可完成新用户的创建。

3. 访问管理 > 策略页面中,给制定的角色或子账号设置对应 DLC 数据策略赋予权限。




4. 申请密钥:若当无子账号时,可引导并跳转到创建子账号对应 API 密钥页面进行创建密钥,如下图:

5. DLC 平台 CAM 权限增加 DLC 策略权限,详情请参见 访问管理 CAM 服务


DLC 单表写入节点配置

配置 DLC 节点

1. 在数据集成页面左侧目录栏单击实时同步
2. 在实时同步页面上方选择单表同步新建(可选择表单和画布模式)并进入配置页面。
3. 单击左侧写入,单击选择 DLC 节点并配置节点信息。



4. 您可以参考下表进行参数信息配置。
参数
说明
数据源
需要写入的 DLC 数据源。
支持选择、或者手动输入需写入的库名称
默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。
当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。
支持选择、或者手动输入需写入的表名称。
当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。
写入模式
DLC 实时同步写入支持两种模式:
append:追加写入,主键冲突时报错。
upsert:更新写入,主键冲突时更新数据。
唯一键
Upsert写入模式下,需设置唯一键保证数据有序性,支持多选。
高级设置
可根据业务需求配置参数。
5. 预览数据字段并与读取节点配置字段映射,单击保存

整库同步至 DLC 配置详情

背景信息及特性支持

支持 MySQL、TDSQL-C MySQL、Kafka 内整个实例或库表数据实时同步至 DLC 原生表或外部表中:
MySQL、TDSQL-C MySQL 支持 DDL 变更监控,MySQL 数据源支持实例、库、表级数据变更监控。
Kafka 支持拆分消息内容动态匹配目标 DLC 表。
DLC 目标端支持根据源端表、及字段变更自动创建库、自动创建表、自动创建字段。
DLC 目标端支持小文件合并。

条件与限制

1. 仅支持同步 DLC iceberg 表,需使用 v2表设置表属性:{ "format-version", "2"}。
2. DLC 对来源端的 DDL 变更响应:
仅支持同步新增列操作,且暂不支持在中间增加列。
暂不支持同步删除列、列名变更、列类型变更、列位置变更等操作。
自动建表时,对于 Mysql 历史表中的 bool 会映射成 int 类型,对于新增表中的 bool 会映射成 bool 类型。
3. 实时同步任务开启 DLC 小文件合并以后,将会消耗 DLC 计算资源。

操作步骤

步骤一:创建整库同步任务

进入配置中心 > 实时同步任务页面后,单击新建整库迁移任务。

步骤二:链路选择

在首页卡片中选择同步至 DLC 目标端的链路。




步骤三:数据来源设置

步骤四:数据目标设置




参数
说明
数据源
选择需要同步的目标数据源。
写入模式
upsert:更新方式写入目标表。此方式下要求目标表中已设置主键:
任务将默认使用主键作为唯一键进行记录更新。
若表无主键则 append 写入。
append:追加模式写入数据表。
库/表匹配策略
DLC 中数据库以及数据表对象的名称匹配规则。
自动创建库表
默认开启。当目标 DLC 不存在符合库/表匹配策略的目标对象时,系统将自动在 DLC 内创建库表以接收来源库表的数据。
说明:
此能力要求 DLC 已开启自动创建库表能力。
自动建表类型
原生表:即 DLC 内部表(lceberg)。
外部表:外部表需手动指定建 COS 路径,COS 路径需以 cosn:// 开头。
小文件合并
默认关闭。开启后,将对实时同步过程中产生的小文件将根据 checkpoint 周期进行自动合并。小文件合并过程中将会消耗 DLC 计算资源,并需要输入合并频率。
高级设置
可根据业务需求配置参数。

步骤五:配置运行资源和策略

DLC 整库同步任务提供任务级运行资源及数据失败写入处理策略。其中数据写入失败处理策略支持三种:



策略名称
策略说明
默认策略
任意表写入异常时,所有表终止写入,任务将失败
部分停止
部分表写入异常时,仅停止该表数据写入,其他表正常同步。已停止的表不可在本次任务运行期间恢复写入。
异常重启
部分表写入异常时,所有表均暂停写入。此策略下任务将持续重启直到所有表正常同步,重启期间可能导致部分表数据重复写入
忽略异常
忽略表内无法写入的异常数据并标记为脏数据。该表的其他数据、以及任务内的其他表正常同步。脏数据提供 COS 归档和不归档两种方案。
COS 归档:将无法写入的脏数据进行归档,需要配置 COS 数据源、存储桶、存储目录、内容分隔符及换行符。
不归档:不需要做其他操作。
场景示例:
任务Task1下计划同步50张表,任务运行过程中表A内出现新增字段或字段类型变更:
默认策略:表A任务运行后新增了一个字段"DEMO"。此策略下,任务将在 DLC 端的目标表A内新建字段"DEMO"后同步数据。期间,其余49张表数据正常同步。
部分停止表A任务运行后将字段"DEMO"的进行了字段类型变更,并且变更后字段类型与目标端字段类型无法匹配写入。此策略下,任务将在停止源端表A的数据读取,后续任务仅同步其余49张表至目标端。
异常重启表A任务运行后将字段"DEMO"的进行了字段类型变更,并且变更后字段类型与目标端字段类型无法匹配写入。此策略下任务将在持续重启,期间任务内配置的所有50张表将暂停数据写入,直到表A字段纠正。
忽略异常表 A 任务运行后将字段 "DEMO" 的进行了字段类型变更,并且变更后字段类型与目标端字段类型无法匹配写入。此策略下任务将忽略无法写入的异常数据,并标记为脏数据,表内其他数据正常同步。

步骤六:配置预览及任务提交




序号
参数
说明
1
提交
将当前任务提交至生产环境,提交时根据当前任务是否有生产态任务可选择不同运行策略。
若当前任务无生效的线上任务,即首次提交或线上任务处于“失败”状态,可直接提交。
若当前任务存在“运行中”或“暂停”状态的线上任务需选择不同策略。停止线上作业将抛弃之前任务运行位点,从头开始消费数据,保留作业状态将在重启后从之前最后消费位点继续运行。



说明:
单击立即启动任务将在提交后立即开始运行,否则需要手动触发才会正式运行。
2
锁定/解锁
默认创建者为首个持锁者,仅允许持锁者编辑任务配置及运行任务。若锁定者5分钟内没有编辑操作,其他人可单击图标抢锁,抢锁成功可进行编辑操作。
3
前往运维
根据当前任务名称快捷跳转至任务运维页面。
4
保存
预览完成后,可单击保存按钮,保存整库任务配置。仅保存的情况下,任务将不会提交至运维中心。

任务提交检测




参数
说明
检测存在异常
支持跳过异常直接提交,或者终止提交。
检测仅存在警告及以下
可直接提交。

提交结果




任务提交中:
展示提交进度百分比。
提示用户勿刷新/关闭页面,文案:当前任务已提交成功,可前往运维进行任务状态及数据管理。
任务提交结果-成功:
展示任务提交成功结果。
提示成功及后续跳转:文案 “提交成功,10秒后将跳转至当前任务运维详情页面” “当前任务已提交成功,可前往运维进行任务状态及数据管理”。
展示任务提交失败原因:
失败原因返回。

后续步骤

完成任务配置后,您可以对已创建的任务进行运维及监控告警,如对任务配置监控报警,并查看任务运行的关键指标等。详情请参见 实时任务运维

数据类型转换支持

写入

内部类型
Iceberg 类型
CHAR
STRING
VARCHAR
STRING
STRING
STRING
BOOLEAN
BOOLEAN
BINARY
FIXED(L)
VARBINARY
BINARY
DECIMAL
DECIMAL(P,S)
TINYINT
INT
SMALLINT
INT
INTEGER
INT
BIGINT
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
DATE
DATE
TIME
TIME
TIMESTAMP
TIMESTAMP
TIMESTAMP_LTZ
TIMESTAMPTZ
INTERVAL
-
ARRAY
LIST
MULTISET
MAP
MAP
MAP
ROW
STRUCT
RAW
-

常见问题

1. 同步增量数据( changlog 数据)到 dlc 时报错

错误信息:



问题原因:
dlc 的表是 v1表不支持 changlog 数据。
解决方法:
1. 修改 dlc 表为 v2表和开启 upsert 支持。
2. 修改表支持 upsert:ALTER TABLE tblname SET TBLPROPERTIES ('write.upsert.enabled'='true')。
3. 修改表为 v2表:ALTER TABLE tblname SET TBLPROPERTIES ('format-version'='2')。
显示表设置的属性是否成功 show tblproperties tblname。

2. Cannot write incompatible dataset to table with schema

错误详情:
Caused by: java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
* mobile should be required, but is optional
at org.apache.iceberg.types.TypeUtil.checkSchemaCompatibility(TypeUtil.java:364)
at org.apache.iceberg.types.TypeUtil.validateWriteSchema(TypeUtil.java:323)
错误原因:
用户建 dlc 表时,字段设置了 NOT NULL 约束。
解决办法:
建表时不要设置 not null 约束。

3. 同步 mysql2dlc,报数组越界

问题详情:
java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.flink.table.data.binary.BinarySegmentUtils.getLongMultiSegments(BinarySegmentUtils.java:736) ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
at org.apache.flink.table.data.binary.BinarySegmentUtils.getLong(BinarySegmentUtils.java:726) ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
at org.apache.flink.table.data.binary.BinarySegmentUtils.readTimestampData(BinarySegmentUtils.java:1022) ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
at org.apache.flink.table.data.binary.BinaryRowData.getTimestamp(BinaryRowData.java:356) ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$39385f9c$1(RowData.java:260) ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
错误原因:
时间字段做主键导致的问题。
解决方法:
1. 不用时间字段做主键。
2. 用户还是需要时间字段保证唯一性,建议用户在 dlc 新增一个冗余字符串字段,将上游的时间字段使用函数转换后映射到冗余字段。

4. DLC 任务报不是一个 iceberg 表

错误信息:



错误原因:
1. 可以让用户在 dlc 执行语句查看表到底是什么类型。
2. 语句:desc formatted 表名。
3. 可以看表类型。
解决办法:
选择正确的引擎建 iceberg 类型的表。

5. flink sql 字段顺序跟 dlc 目标表字段不一致导致报错

解决方法:
不修改任务表字段顺序。