Kafka 环境准备与数据库配置
前提条件
1. 版本:Kafka 版本0.11及以上。
2. 自建 Kafka:自建 Kafka 和集成资源组的网络需要打通。
使用限制
Kafka 版本0.11及以上。
注意事项
Kafka 客户端与服务端建立连接的过程如下所示:
1. 客户端使用您指定的 bootstrap.servers 地址连接 Kafka 服务端,Kafka 服务端根据配置向客户端返回集群中各台 broker 的元信息,包括各台 broker 的连接地址。
2. 客户端使用第一步 broker 返回的连接地址连接各台 broker 进行读取或写入:
我们需要注意当 bootstrap.servers 地址可以连通时,仍然报网络问题连通性问题时,您可以参考以下方式进行排查。
排查下 Kafka 服务端返回的 broker 连接地址是否连通性存在问题。
检查 Kafka broker 配置文件 server.properties中listeners 和 advertised.listeners 的地址是否可以和集成资源组网络连通。
数据源配置
目前支持通过连接串方式引入 Kafka 类型数据。
![](https://qcloudimg.tencent-cloud.cn/image/document/d74183aa7e81f559eb5e0664183842b0.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/d74183aa7e81f559eb5e0664183842b0.png)
参数说明如下:
参数 | 说明 |
数据源名称 | 新建的数据源的名称,由用户自定义且不可为空。命名以字母开头,可包含字母、数字、下划线。长度在20字符以内。 |
描述 | 选填,对本数据源的描述。 |
数据源权限 | 选择项目共享或仅个人与管理官可使用。 |
部署方式 | 数据来源于自建实例或公网实例。 |
区域和网络 | 选择账户下云数据库实例所在的地域、实例名称及 ID 信息。 |
Kafka 服务列表 | 请输入服务列表,例如 ip1:9092,ip2:9092。 |
Kafka 安全协议 | 请输入 Kafka 安全协议。 |
Kafka sasl 机制 | 请输入 Kafka sasl 机制。 |
Kafka sasl jaas 配置 | 请输入 Kafka sasl jaas 配置。 |
数据连通性 | 测试是否能够连通所配置的数据库。 说明: 若连通性测试不通过,会给出错误提示供排查原因,同时,数据源仍可保存,但该数据源使用时会发生异常。 |
Kafka 单表读取节点配置
条件及限制
Kafka 支持版本:
节点 | 版本 |
Kafka | 0.10+ |
创建 Kafka 节点
1. 在数据集成页面左侧目录栏单击实时同步。
2. 在实时同步页面上方选择单表同步新建(可选择表单和画布模式)并进入配置页面。
3. 单击左侧读取,单击选择 Kafka 节点并配置节点信息。
![](https://qcloudimg.tencent-cloud.cn/image/document/6d5d6169c39b9f47c98b07dfd1b63137.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/6d5d6169c39b9f47c98b07dfd1b63137.png)
4. 参数说明:
参数 | 描述 |
节点名称 | 输入 Kafka 节点名称。 |
数据源 | Kafka 读取端数据源类型支持 Kafka、CKafka。 |
topic | Kafka 数据源中的 Topic。 |
序列化格式 | Kafka 消息序列化格式类型,支持:canal-json、json、avro、csv。 |
消息类型 | append 消息:Kafka 内消息来源于 append 消息流,通常消息中不携带唯一键。写入节点建议搭配 append 写入模式。 upsert 消息:Kafka 内消息来源于 upsert 消息流,通常消息中携带唯一键,设置后消息可保证 Exactly-Once。写入节点建议搭配 upsert 写入模式。 |
读取位置 | 启动同步任务时开始同步数据的起始位点。 |
消费组 id | 请避免该参数与其他消费进程重复,以保证消费位点的正确性。如果不指定该参数,默认设定 group.id=WeData_ group_${任务id}。 |
5. 预览字段,单击保存。
Kafka 单表写入节点配置
创建 Kafka 节点
1. 在数据集成页面左侧目录栏单击实时同步。
2. 在实时同步页面上方选择单表同步新建(可选择表单和画布模式)并进入配置页面。
3. 单击左侧写入,单击选择 Kafka 节点并配置节点信息。
![](https://qcloudimg.tencent-cloud.cn/image/document/c95bbd6e55a787b71b2f5f45bfdfe441.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/c95bbd6e55a787b71b2f5f45bfdfe441.png)
4. 参数说明:
参数 | 描述 |
节点名称 | 输入 Kafka 节点名称。 |
数据源 | Kafka 写入端数据源类型支持 Kafka、Ckafka。 |
topic | Kafka 数据源中的 Topic。 |
序列化格式 | Kafka 消息序列化格式类型,支持:canal-json、json、avro、csv。 |
消息类型 | append 消息:Kafka 内消息来源于 append 消息流,通常消息中不携带唯一键。 写入节点建议搭配 append 写入模式 upsert 消息:Kafka 内消息来源于 upsert 消息流,通常消息中携带唯一键,设置后消息可保证 Exactly-Once。写入节点建议搭配 upsert 写入模式。 |
读取位置 | 启动同步任务时开始同步数据的起始位点。 |
消费组 id | 请避免该参数与其他消费进程重复,以保证消费位点的正确性。如果不指定该参数,默认设定 group.id=WeData_ group_${任务id}。 |
5. 预览字段,单击保存,即可创建 Kafka 节点。
Kafka 整库来源配置
条件及限制
支持 Kafka 版本详情下表所示:
节点 | 版本 |
Kafka | 0.10+ |
Kafka 读取配置参数说明
![](https://qcloudimg.tencent-cloud.cn/image/document/b7f399fa80388fb1af8435e06033cdb0.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/b7f399fa80388fb1af8435e06033cdb0.png)
参数 | 说明 |
数据源 | 选择需要同步的 Kafka 数据源。 |
来源 Topic | 选择或输入任务计划消费的 Topic 名称。 |
序列化格式 | 设置 Kafka 内原始消息格式,目前支持解析 canal 和 debezium。 说明:设置格式需与消息实际格式保持一致。 |
读取位置 | 设置 Kafka 数据读取位点: 从最早开始:earlist 从最新开始:latest 从指定时间开始:设定具体任务启动时间位点 |
高级设置(可选) | 可根据业务需求配置参数。 |
整库同步至 Kafka 配置详情
背景信息及特性支持
支持 MySQL、TDSQL-C MySQL、PostgreSQL、Mongo 内整个实例或库表数据实时同步至 Kafka 中:
MySQL、TDSQL-C MySQL、PostgreSQL 等来源端支持 DDL 变更监控,支持实例、库、表级数据变更监控。
Kafka 目标端支持 Topic 自动创建,支持指定 Partition 分区策略。
条件与限制
如需使用自动创建 Topic 能力,请提前在 Kafka 服务端设置:
auto.create.topics.enable=true
开启自动创建 Topic 功能后,目标 Topic 需遵守 CKafka/kafka Topic 命名规则,以防止任务运行时 Topic 创建失败。
Kafka 开启自动创建 Topic 时,请合理配置好分区数,避免造成性能问题。
操作步骤
步骤一:创建整库同步任务
进入配置中心 > 实时同步任务页面后,单击新建整库迁移任务。
步骤二:链路选择
在首页卡片中选择同步至 Kafka 目标端的链路。
![](https://qcloudimg.tencent-cloud.cn/image/document/247f377d33e3eae01439ec9b2aa5735a.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/247f377d33e3eae01439ec9b2aa5735a.png)
步骤三:数据来源设置
步骤四:数据目标设置
![](https://qcloudimg.tencent-cloud.cn/image/document/1140a01da467d4ba9475329a70be79cc.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/1140a01da467d4ba9475329a70be79cc.png)
参数 | 说明 |
数据源 | 选择需要同步的目标数据源 |
序列化格式 | 支持 canal-json 和 debezium 两种格式 |
同步至多 Topic | 默认打开:此选项下可实现来源数据与目标 Topic 多对多映射,任务执行过程中将根据策略匹配对应 Topic 的名称。 关闭:手动输入或者选择目标 Topic 名称,后续所有数据将统一写入该 Topic 内。 |
支持自动创建 Topic | 打开后,若 Topic 不存在时系统将根据 Topic 名匹配规则自动创建 Topic。 注意: 此功能需保证 Kafka 已开启自动创建,请提前在 Kafka 服务端设定:
|
Topic 匹配策略 | 与来源表同名:默认使用与来源表同名的 Topic 。 自定义:根据定义策略规则匹配 Topic 。 |
分区规则 | 配置 topic partition 分区映射(轮询写入分区、根据表名分区、根据来源表主键分区): 轮询写入分区:轮询(Round Robin)上游数据写入到每个 partition。 根据表名写入分区:根据上游数据中的表名hash映射写入每个 partition。 根据来源表主键分区:根据上游数据中的主键数据内容 hash 映射写入每个 partition。 指定分区: 写入指定单分区:输入分区序号,所有消息仅写入到固定分区。 根据数据源写入多分区:同一行设置的数据源将统一写入对应分区: 数据源:数据源范围为所有来源端配置的数据源名称,支持多选,同行内,已选数据源不可重复选择。 分区号:输入分区序号。 支持新增/删除管理。 根据表规则写入多分区:支持输入库、表正则进行对象匹配,符合匹配规则的对象写入到指定分区中,规则之间顺序执行,已匹配库表不参与后续规则匹配。 自定义:支持使用 “内置参数” 拼接写入分区规则,设定后将根据分区规则对应的值对消息进行 hash 分区。 ![]() |
步骤五:配置运行资源
![](https://qcloudimg.tencent-cloud.cn/image/document/3710be32387632c7daf527a2aa9293a7.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/3710be32387632c7daf527a2aa9293a7.png)
脏数据策略:提供 COS 归档和不归档两种方案。
COS 归档:将无法写入的脏数据进行归档,需要配置 COS 数据源、存储桶、存储目录、内容分隔符及换行符。
不归档:不需要做其他操作。
步骤六:配置预览及任务提交
![](https://qcloudimg.tencent-cloud.cn/image/document/634679cb00cddfcec864c1bb53dd5705.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/634679cb00cddfcec864c1bb53dd5705.png)
序号 | 参数 | 说明 |
1 | 提交 | 将当前任务提交至生产环境,提交时根据当前任务是否有生产态任务可选择不同运行策略。 若当前任务无生效的线上任务,即首次提交或线上任务处于“失败”状态,可直接提交。 若当前任务存在“运行中”或“暂停”状态的线上任务需选择不同策略。停止线上作业将抛弃之前任务运行位点,从头开始消费数据,保留作业状态将在重启后从之前最后消费位点继续运行。 ![]() 说明: 单击立即启动任务将在提交后立即开始运行,否则需要手动触发才会正式运行。 |
2 | 锁定/解锁 | 默认创建者为首个持锁者,仅允许持锁者编辑任务配置及运行任务。若锁定者5分钟内没有编辑操作,其他人可点击图标抢锁,抢锁成功可进行编辑操作。 |
3 | 前往运维 | 根据当前任务名称快捷跳转至任务运维页面。 |
4 | 保存 | 预览完成后,可单击保存按钮,保存整库任务配置。仅保存的情况下,任务将不会提交至运维中心。 |
任务提交检测
![](https://qcloudimg.tencent-cloud.cn/image/document/828a2364efe5b98abfc1b2d7b105170c.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/828a2364efe5b98abfc1b2d7b105170c.png)
参数 | 说明 |
检测存在异常 | 支持跳过异常直接提交,或者终止提交。 |
检测仅存在警告及以下 | 可直接提交。 |
提交结果
![](https://qcloudimg.tencent-cloud.cn/image/document/a48fecc8cf259db46a94708562d56818.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/a48fecc8cf259db46a94708562d56818.png)
任务提交中:
展示提交进度百分比。
提示用户勿刷新/关闭页面,文案:当前任务已提交成功,可前往运维进行任务状态及数据管理。
任务提交结果-成功:
展示任务提交成功结果。
提示成功及后续跳转:文案 “提交成功,10秒后将跳转至当前任务运维详情页面” “当前任务已提交成功,可前往运维进行任务状态及数据管理”。
展示任务提交失败原因:
失败原因返回。
后续步骤
附录:Canal-json/Debezium 数据格式样例
Canal-json
{"data": [{"id": "2","name": "scooter33","description": "Big 2-wheel scooter233","weight": "5.11"}],"database": "pacino99","es": 1589373560000,"id": 9,"isDdl": false,"mysqlType": {"id": "INTEGER","name": "VARCHAR(255)","description": "VARCHAR(512)","weight": "FLOAT"},"old": [{"weight": "5.12"}],"pkNames": ["id"],"sql": "","sqlType": {"id": 4,"name": 12,"description": 12,"weight": 7},"table": "products999","ts": 1589373560798,"type": "UPDATE"}
Debezium
{"schema": {"type": "struct","fields": [{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "mysql-server-1.inventory2.customers2.Value","field": "before"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "mysql-server-1.inventory2.customers2.Value","field": "after"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "version"},{"type": "string","optional": false,"field": "connector"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "ts_ms"},{"type": "boolean","optional": true,"default": false,"field": "snapshot"},{"type": "string","optional": false,"field": "db"},{"type": "string","optional": true,"field": "table"},{"type": "int64","optional": false,"field": "server_id"},{"type": "string","optional": true,"field": "gtid"},{"type": "string","optional": false,"field": "file"},{"type": "int64","optional": false,"field": "pos"},{"type": "int32","optional": false,"field": "row"},{"type": "int64","optional": true,"field": "thread"},{"type": "string","optional": true,"field": "query"}],"optional": false,"name": "io.debezium.connector.mysql.Source","field": "source"},{"type": "string","optional": false,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"}],"optional": false,"name": "mysql-server-1.inventory.customers.Envelope"},"payload": {"op": "c","ts_ms": 1465491411815,"before": null,"after": {"id": 12003,"first_name": "Anne322","last_name": "Kretchmar3222","email": "annek@noanswer.org3222"},"source": {"version": "1.9.6.Final","connector": "mysql","name": "mysql-server-1","ts_ms": 0,"snapshot": false,"db": "inventory333","table": "customers433","server_id": 0,"gtid": null,"file": "mysql-bin.000003","pos": 154,"row": 0,"thread": 7,"query": ""}}}
Kafka 日志采集写入节点配置
![](https://qcloudimg.tencent-cloud.cn/image/document/53d283ed115a95aefc829a1637fb7f00.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/53d283ed115a95aefc829a1637fb7f00.png)
参数 | 说明 |
数据源 | 选择当前项目中可用的 Kafka 数据源,Kafka 写入端数据源类型支持 Kafka、Ckafka 。 |
topic | Kafka 数据源中的 Topic。 |
序列化格式 | Kafka 消息序列化格式类型,支持三种类型: canal-json json avro |
写入模式 | Kafka 支持两种写入模式: append:追加写入。 upsert:以 upsert 方式插入消息,设置后消息仅只能被消息端处理一次以保证 Exactly-Once。 |
唯一键 | Upsert 写入模式下,需设置唯一键保证数据有序性,支持多选,Append 模式则不需要设置唯一键。 |
高级设置(可选) | 可根据业务需求配置参数。 |