Kafka 数据源

最近更新时间:2024-04-23 17:45:01

我的收藏

Kafka 环境准备与数据库配置

前提条件

1. 版本:Kafka 版本0.11及以上。
2. 自建 Kafka:自建 Kafka 和集成资源组的网络需要打通。

使用限制

Kafka 版本0.11及以上。

注意事项

Kafka 客户端与服务端建立连接的过程如下所示:
1. 客户端使用您指定的 bootstrap.servers 地址连接 Kafka 服务端,Kafka 服务端根据配置向客户端返回集群中各台 broker 的元信息,包括各台 broker 的连接地址。
2. 客户端使用第一步 broker 返回的连接地址连接各台 broker 进行读取或写入:
我们需要注意当 bootstrap.servers 地址可以连通时,仍然报网络问题连通性问题时,您可以参考以下方式进行排查。
排查下 Kafka 服务端返回的 broker 连接地址是否连通性存在问题。
检查 Kafka broker 配置文件 server.properties中listeners 和 advertised.listeners 的地址是否可以和集成资源组网络连通。

数据源配置

目前支持通过连接串方式引入 Kafka 类型数据。



参数说明如下:
参数
说明
数据源名称
新建的数据源的名称,由用户自定义且不可为空。命名以字母开头,可包含字母、数字、下划线。长度在20字符以内。
描述
选填,对本数据源的描述。
数据源权限
选择项目共享或仅个人与管理官可使用。
部署方式
数据来源于自建实例或公网实例。
区域和网络
选择账户下云数据库实例所在的地域、实例名称及 ID 信息。
Kafka 服务列表
请输入服务列表,例如 ip1:9092,ip2:9092。
Kafka 安全协议
请输入 Kafka 安全协议。
Kafka sasl 机制
请输入 Kafka sasl 机制。
Kafka sasl jaas 配置
请输入 Kafka sasl jaas 配置。
数据连通性
测试是否能够连通所配置的数据库。
说明:
若连通性测试不通过,会给出错误提示供排查原因,同时,数据源仍可保存,但该数据源使用时会发生异常。

Kafka 单表读取节点配置

条件及限制

Kafka 支持版本:
节点
版本
Kafka
0.10+

创建 Kafka 节点

1. 在数据集成页面左侧目录栏单击实时同步
2. 在实时同步页面上方选择单表同步新建(可选择表单和画布模式)并进入配置页面。
3. 单击左侧读取,单击选择 Kafka 节点并配置节点信息。



4. 参数说明:
参数
描述
节点名称
输入 Kafka 节点名称。
数据源
Kafka 读取端数据源类型支持 Kafka、CKafka。
topic
Kafka 数据源中的 Topic。
序列化格式
Kafka 消息序列化格式类型,支持:canal-json、json、avro、csv。
消息类型
append 消息:Kafka 内消息来源于 append 消息流,通常消息中不携带唯一键。写入节点建议搭配 append 写入模式。
upsert 消息:Kafka 内消息来源于 upsert 消息流,通常消息中携带唯一键,设置后消息可保证 Exactly-Once。写入节点建议搭配 upsert 写入模式。
读取位置
启动同步任务时开始同步数据的起始位点。
消费组 id
请避免该参数与其他消费进程重复,以保证消费位点的正确性。如果不指定该参数,默认设定 group.id=WeData_ group_${任务id}。
5. 预览字段,单击保存

Kafka 单表写入节点配置

创建 Kafka 节点

1. 在数据集成页面左侧目录栏单击实时同步
2. 在实时同步页面上方选择单表同步新建(可选择表单和画布模式)并进入配置页面。
3. 单击左侧写入,单击选择 Kafka 节点并配置节点信息。



4. 参数说明:
参数
描述
节点名称
输入 Kafka 节点名称。
数据源
Kafka 写入端数据源类型支持 Kafka、Ckafka。
topic
Kafka 数据源中的 Topic。
序列化格式
Kafka 消息序列化格式类型,支持:canal-json、json、avro、csv。
消息类型
append 消息:Kafka 内消息来源于 append 消息流,通常消息中不携带唯一键。
写入节点建议搭配 append 写入模式 upsert 消息:Kafka 内消息来源于 upsert 消息流,通常消息中携带唯一键,设置后消息可保证 Exactly-Once。写入节点建议搭配 upsert 写入模式。
读取位置
启动同步任务时开始同步数据的起始位点。
消费组 id
请避免该参数与其他消费进程重复,以保证消费位点的正确性。如果不指定该参数,默认设定 group.id=WeData_ group_${任务id}。
5. 预览字段,单击保存,即可创建 Kafka 节点。

Kafka 整库来源配置

条件及限制

支持 Kafka 版本详情下表所示:
节点
版本
Kafka
0.10+

Kafka 读取配置参数说明




参数
说明
数据源
选择需要同步的 Kafka 数据源。
来源 Topic
选择或输入任务计划消费的 Topic 名称。
序列化格式
设置 Kafka 内原始消息格式,目前支持解析 canal 和 debezium。
说明:设置格式需与消息实际格式保持一致。
读取位置
设置 Kafka 数据读取位点:
从最早开始:earlist
从最新开始:latest
从指定时间开始:设定具体任务启动时间位点
高级设置(可选)
可根据业务需求配置参数。

整库同步至 Kafka 配置详情

背景信息及特性支持

支持 MySQL、TDSQL-C MySQL、PostgreSQL、Mongo 内整个实例或库表数据实时同步至 Kafka 中:
MySQL、TDSQL-C MySQL、PostgreSQL 等来源端支持 DDL 变更监控,支持实例、库、表级数据变更监控。
Kafka 目标端支持 Topic 自动创建,支持指定 Partition 分区策略。

条件与限制

如需使用自动创建 Topic 能力,请提前在 Kafka 服务端设置:
auto.create.topics.enable=true
开启自动创建 Topic 功能后,目标 Topic 需遵守 CKafka/kafka Topic 命名规则,以防止任务运行时 Topic 创建失败。
Kafka 开启自动创建 Topic 时,请合理配置好分区数,避免造成性能问题。

操作步骤

步骤一:创建整库同步任务

进入配置中心 > 实时同步任务页面后,单击新建整库迁移任务。

步骤二:链路选择

在首页卡片中选择同步至 Kafka 目标端的链路。




步骤三:数据来源设置

步骤四:数据目标设置




参数
说明
数据源
选择需要同步的目标数据源
序列化格式
支持 canal-json 和 debezium 两种格式
同步至多 Topic
默认打开:此选项下可实现来源数据与目标 Topic 多对多映射,任务执行过程中将根据策略匹配对应 Topic 的名称。
关闭:手动输入或者选择目标 Topic 名称,后续所有数据将统一写入该 Topic 内。
支持自动创建 Topic
打开后,若 Topic 不存在时系统将根据 Topic 名匹配规则自动创建 Topic。
注意:
此功能需保证 Kafka 已开启自动创建,请提前在 Kafka 服务端设定:
auto.create.topics.enable=true
Topic 匹配策略
与来源表同名:默认使用与来源表同名的 Topic 。
自定义:根据定义策略规则匹配 Topic 。
分区规则
配置 topic partition 分区映射(轮询写入分区、根据表名分区、根据来源表主键分区):
轮询写入分区:轮询(Round Robin)上游数据写入到每个 partition。
根据表名写入分区:根据上游数据中的表名hash映射写入每个 partition。
根据来源表主键分区:根据上游数据中的主键数据内容 hash 映射写入每个 partition。
指定分区
写入指定单分区:输入分区序号,所有消息仅写入到固定分区。
根据数据源写入多分区:同一行设置的数据源将统一写入对应分区:
数据源:数据源范围为所有来源端配置的数据源名称,支持多选,同行内,已选数据源不可重复选择。
分区号:输入分区序号。
支持新增/删除管理。
根据表规则写入多分区:支持输入库、表正则进行对象匹配,符合匹配规则的对象写入到指定分区中,规则之间顺序执行,已匹配库表不参与后续规则匹配。
自定义:支持使用 “内置参数” 拼接写入分区规则,设定后将根据分区规则对应的值对消息进行 hash 分区。




步骤五:配置运行资源




脏数据策略:提供 COS 归档和不归档两种方案。
COS 归档:将无法写入的脏数据进行归档,需要配置 COS 数据源、存储桶、存储目录、内容分隔符及换行符。
不归档:不需要做其他操作。

步骤六:配置预览及任务提交




序号
参数
说明
1
提交
将当前任务提交至生产环境,提交时根据当前任务是否有生产态任务可选择不同运行策略。
若当前任务无生效的线上任务,即首次提交或线上任务处于“失败”状态,可直接提交。
若当前任务存在“运行中”或“暂停”状态的线上任务需选择不同策略。停止线上作业将抛弃之前任务运行位点,从头开始消费数据,保留作业状态将在重启后从之前最后消费位点继续运行。



说明:
单击立即启动任务将在提交后立即开始运行,否则需要手动触发才会正式运行。
2
锁定/解锁
默认创建者为首个持锁者,仅允许持锁者编辑任务配置及运行任务。若锁定者5分钟内没有编辑操作,其他人可点击图标抢锁,抢锁成功可进行编辑操作。
3
前往运维
根据当前任务名称快捷跳转至任务运维页面。
4
保存
预览完成后,可单击保存按钮,保存整库任务配置。仅保存的情况下,任务将不会提交至运维中心。

任务提交检测




参数
说明
检测存在异常
支持跳过异常直接提交,或者终止提交。
检测仅存在警告及以下
可直接提交。

提交结果




任务提交中:
展示提交进度百分比。
提示用户勿刷新/关闭页面,文案:当前任务已提交成功,可前往运维进行任务状态及数据管理。
任务提交结果-成功:
展示任务提交成功结果。
提示成功及后续跳转:文案 “提交成功,10秒后将跳转至当前任务运维详情页面” “当前任务已提交成功,可前往运维进行任务状态及数据管理”。
展示任务提交失败原因:
失败原因返回。

后续步骤

完成任务配置后,您可以对已创建的任务进行运维及监控告警,如对任务配置监控报警,并查看任务运行的关键指标等。详情请参见 实时任务运维

附录:Canal-json/Debezium 数据格式样例

Canal-json
{
"data": [
{
"id": "2",
"name": "scooter33",
"description": "Big 2-wheel scooter233",
"weight": "5.11"
}
],
"database": "pacino99",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.12"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products999",
"ts": 1589373560798,
"type": "UPDATE"
}
Debezium
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory2.customers2.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory2.customers2.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "mysql-server-1.inventory.customers.Envelope"
},
"payload": {
"op": "c",
"ts_ms": 1465491411815,
"before": null,
"after": {
"id": 12003,
"first_name": "Anne322",
"last_name": "Kretchmar3222",
"email": "annek@noanswer.org3222"
},
"source": {
"version": "1.9.6.Final",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 0,
"snapshot": false,
"db": "inventory333",
"table": "customers433",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": 7,
"query": ""
}
}
}

Kafka 日志采集写入节点配置




参数
说明
数据源
选择当前项目中可用的 Kafka 数据源,Kafka 写入端数据源类型支持 Kafka、Ckafka 。
topic
Kafka 数据源中的 Topic。
序列化格式
Kafka 消息序列化格式类型,支持三种类型:
canal-json
json
avro
写入模式
Kafka 支持两种写入模式:
append:追加写入。
upsert:以 upsert 方式插入消息,设置后消息仅只能被消息端处理一次以保证 Exactly-Once。
唯一键
Upsert 写入模式下,需设置唯一键保证数据有序性,支持多选,Append 模式则不需要设置唯一键。
高级设置(可选)
可根据业务需求配置参数。