PostgreSQL 数据变更记录分析

最近更新时间:2023-05-29 10:52:04

我的收藏

操作场景

PostgreSQL 通过 WAL(Write-Ahead Logging)预写日志机制保证事务持久性和数据完整性,开发者可以将订阅 WAL 应用在增量索引缓存一致性基于数据的任务分发记录数据变更等场景。



本文以 PostgreSQL 接入 CKafka,并从 CKafka 流出到 CLS 为例,讲解如何使用 CKafka 连接器实现 PostgreSQL 数据变更记录分析。

运行原理

CKafka 连接器通过订阅 PostgreSQL WAL,将行级数据变更记录转化为 JSON 格式的消息生产到 CKafka 中。

前提条件

需要开通云数据库 PostgrepSQL,然后修改以下配置。
wal_level=logical
#
# 9.4、9.5、9.6 版本需要根据业务需求设置下面参数
# 10 及以上版本则可以使用默认值
#
max_replication_slots=10
max_wal_senders=10
需开启 CKafka 服务。
需开启 CLS 服务。

操作步骤

步骤1:创建数据接入连接

1. 登录 CKafka 控制台
2. 在左侧导航栏单击连接器 > 连接列表,选择好地域后,单击新建连接
3. 连接类型选择 PostgreSQL,然后单击下一步。n


1. 选择有 REPLICATIONLOGIN 权限的用户,填写 PostgreSQL 连接配置,然后单击下一步。n



步骤2:创建数据接入任务

1. 在左侧导航栏单击任务管理 > 任务列表,选择好地域后,单击新建任务
2. 任务类型选择数据接入,接入方式选择PostgreSQL 数据订阅,单击下一步。n


3. 数据源选择刚刚新建的 PostgreSQL 连接。database 留空表示监听所有数据库的变更。table 留空表示监听某个数据库的所有表的变更。复制存量数据可以根据业务需求选择是否打开,然后单击下一步。n


4. 根据业务需求配置数据目标,然后单击提交
5. 当 PostgreSQL 数据发送变化时,可以在目标 Topic 中查看到新增的消息。n


数据目标为 CKafka 实例的 Topic,可以在左侧导航栏单击消息查询进行查看;
数据目标为单独 Topic 时,可以在左侧导航栏单击 Topic 列表,然后单击 Topic 进入详情页,再单击查看消息

步骤3:创建数据流出任务

1. 在左侧导航栏单击任务管理 > 任务列表,选择好地域后,单击新建任务
2. 任务类型选择数据流出,数据目标选择日志服务(CLS),单击下一步。n


3. 填写任务详情,选取与数据接入任务相同的 CKafka 实例和 Topic,保证在消息生产后能直接进行消费。n


4. 单击提交,等待任务显示健康,即表示创建成功。
说明
当任务在健康的状态时, Topic 有新增的消息写入,会直接被消费到指定的 CLS 日志主题中。

步骤4:查看流出数据

1. 登录 日志服务 控制台。
2. 在左侧导航栏选择检索分析,选择流出时填写的日志集与日志主题的“ID”,即可看到 PostgreSQL 的变更记录。n


3. 通过关键字检索等操作,能直观得到所需要的记录。