有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持 Sink
1.14
不支持
1.16
不支持

使用范围

可以作为 Sink 使用。目前支持写入 DLC 托管的原生表。

DDL 定义

CREATE TABLE `eason_internal_test`(
`name` STRING,
`age` INT
) WITH (
'connector' = 'dlc-inlong',
'catalog-database' = 'test',
'catalog-table' = 'eason_internal_test',
'default-database' = 'test',
'catalog-name' = 'HYBRIS',
'catalog-impl' = 'org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog',
'qcloud.dlc.secret-id' = '12345asdfghASDFGH',
'qcloud.dlc.secret-key' = '678910asdfghASDFGH',
'qcloud.dlc.region' = 'ap-guangzhou',
'qcloud.dlc.jdbc.url' = 'jdbc:dlc:dlc.internal.tencentcloudapi.com?task_type=SparkSQLTask&database_name=test&datasource_connection_name=DataLakeCatalog&region=ap-guangzhou&data_engine_name=dailai_test',
'qcloud.dlc.managed.account.uid' = '100026378089',
'request.identity.token' = '100026378089',
'user.appid' = '1257058945',
'uri' = 'dlc.internal.tencentcloudapi.com'
);

WITH 参数

通用参数

参数值
必填
默认值
描述
connector
connector 类型,必须填 dlc-inlong
catalog-database
DLC 内表所在的数据库名称
catalog-table
DLC 内表名称
default-database
DLC 内表所在的数据库名称
catalog-name
catalog 名称,必须填 HYBRIS
catalog-impl
catalog的 实现类,必须填 org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog
qcloud.dlc.managed.account.uid
DLC 管理账号的 uid,此处固定填写 100026378089
qcloud.dlc.secret-id
DLC 用户的 secretId,从 https://console.cloud.tencent.com/cam/capi 中获取
qcloud.dlc.secret-key
DLC 用户的 secretKey,从 https://console.cloud.tencent.com/cam/capi 中获取
qcloud.dlc.region
DLC 所在地域,必须填 ap-地域 格式
qcloud.dlc.jdbc.url
DLC jdbc 接入 url,格式见 JDBC 访问
uri
DLC 接入 uri,必须填 dlc.internal.tencentcloudapi.com
user.appid
DLC 用户的 appid
request.identity.token
DLC 内表接入的 token,此处固定填写 100026378089
sink.ignore.changelog
是否忽略 delete 数据,默认为 false,设为 true 则进入 append mode

DLC 表配置

Upsert 模式
-- DLC 建表语句
CREATE TABLE `bi_sensor`(
`uuid` string,
`id` string,
`type` string,
`project` string,
`properties` string,
`sensors_id` string,
`time` int,
`hour` int) PARTITIONED BY (`time`);
-- 将目标表设为 v2 表,允许 upsert
ALTER TABLE `bi_sensor` SET TBLPROPERTIES ('format-version'='2','write.metadata.delete-after-commit.enabled' = 'true', 'write.metadata.previous-versions-max' = '100', 'write.metadata.metrics.default' = 'full', 'write.upsert.enabled'='true', 'write.distribution-mode'='hash');

-- oceanus sink DDL,dlc 的主键和分区字段必须在 flink 定义的主键字段中
create table bi_sensors (
`uuid` STRING,
`id` STRING,
`type` STRING,
`project` STRING,
`properties` STRING,
`sensors_id` STRING,
`time` int,
`hour` int,
PRIMARY KEY (`uuid`, `time`) NOT ENFORCED
) with (...)
注意:
若您写入的 DLC 是基于元数据加速桶的,需要参考如下操作流程。

基于元数据加速桶的 DLC 操作流程

客户为 Oceanus 集群设置元数据加速桶的 DLC 权限

1. 进入 数据湖计算 DLC 控制台,单击全局配置 > 存储配置 > 元数据加速桶配置
2. 选择需要写入的托管桶,单击配置
3. 在“腾讯云产品绑定”中选择产品“流计算 Oceanus”,并选择相应集群资源,单击保存




客户为作业添加配置后启动作业

首先将下列 jar 包添加到依赖文件,并在作业参数中引用:



随后在作业高级参数中添加如下参数:
flink.hadoop.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter
flink.hadoop.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter
flink.hadoop.fs.ofs.tmp.cache.dir: /tmp/chdfs/
flink.hadoop.fs.ofs.upload.flush.flag: true
flink.hadoop.fs.ofs.user.appid: 客户的 appid
flink.hadoop.fs.cosn.trsf.fs.ofs.user.appid: 客户的 appid
flink.hadoop.fs.cosn.trsf.fs.ofs.bucket.region: cos 所在地域(例如:弗吉尼亚对应 na-ashburn,广州对应 ap-guangzhou)
flink.hadoop.fs.cosn.trsf.fs.ofs.tmp.cache.dir: /tmp/chdfs

# 注意:使用 hadoop 用户权限则添加下面参数,使用 flink 用户无需添加
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop