有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

操作场景

MySQL Binlog 是记录 MySQL 所有数据变动的二进制日志文件,用于 MySQL 主从复制和数据恢复。此外开发者还可以将订阅 MySQL Binlog 应用在增量索引缓存一致性基于数据的任务分发记录数据变更等场景。



本文以 MySQL Binlog 接入 CKafka,并从 CKafka 流出到 CLS 为例,讲解如何使用 CKafka 连接器实现 MySQL 数据变更记录分析。

运行原理

CKafka 连接器通过 Binlog 同步组件订阅 MySQL Binlog,然后将数据变更记录转化为 JSON 格式的消息生产到 CKafka 中。

前提条件

需要开通云数据库 MySQL开启 Binlog,并且根据业务需求修改参数设置
binlog_format=ROW
# binlog_row_image 生产环境建议设置为 FULL,记录变更前后所有字段的值。可以快速恢复误操作的数据。
binlog_row_image=FULL
# binlog_rows_query_log_events 设为 ON 时,会记录变更时的 SQL 语句
binlog_rows_query_log_events=ON
需开启 CKafka 服务。
需开启 CLS 服务。

操作步骤

步骤1:创建数据接入连接

1. 登录 CKafka 控制台
2. 在左侧导航栏单击连接器 > 连接列表,选择好地域后,单击新建连接
3. 连接类型选择 MySQL,然后单击下一步。n


4. 选择有 Binlog 权限的用户,填写 MySQL 连接配置后,单击下一步等待连接校验完成。n



步骤2:创建数据接入任务

1. 在左侧导航栏单击任务管理 > 任务列表,选择好地域后,单击新建任务
2. 任务类型选择数据接入,接入方式选择 MySQL 数据订阅,单击下一步。n


3. 数据源选择刚刚新建的 MySQL 连接database 留空表示监听所有数据库的变更。table 留空表示监听某个数据库的所有表的变更。复制存量数据可以根据业务需求选择是否打开,然后单击下一步。n


4. 根据业务需求配置数据目标然后点击提交
注意
为保证 Binlog 事件的全局顺序性,请设置 Topic 分区数请设置为 1 。
5. 当 MySQL 数据发送变化时,可以在目标 Topic 中查看到新增的消息。n


数据目标为 CKafka 实例的 Topic,可以在侧边栏点击消息查询进行查看;
数据目标为单独 Topic 时,可以在侧边栏点击 Topic 列表,然后点击 Topic 进入详情页,再点击查看消息

步骤3:创建数据流出任务

1. 在左侧导航栏单击任务管理 > 任务列表,选择好地域后,单击新建任务
2. 任务类型选择数据流出,数据目标选择日志服务(CLS),单击下一步。n


3. 填写任务详情,选取与数据接入任务相同的 CKafka 实例和 Topic,保证在消息生产后能直接进行消费。n


4. 配置好数据处理规则和数据目标后,单击提交,等待任务显示健康,即表示创建成功。
说明
当任务在健康的状态时, Topic 有新增的消息写入,会直接被消费到指定的 CLS 日志主题中。

步骤4:查看流出数据

1. 登录 日志服务 控制台。
2. 在左侧导航栏选择检索分析,选择流出时填写的日志集与日志主题的“ID”,即可看到 MySQL 的变更记录。n


3. 通过关键字检索等操作,能直观得到所需要的记录。