文档中心>实践教程>云数据库 PostgreSQL>使用 Debezium 采集 PostgreSQL 数据

使用 Debezium 采集 PostgreSQL 数据

最近更新时间:2025-05-15 17:02:02

我的收藏
业务应用场景中,常常需要实时捕获数据库的数据变更并将其同步至其他系统。该过程可通过 Debezium 实现, Debezium 用于监控数据库变化和捕捉数据变动事件,并以事件流的形式导出。
本文将说明如何使用 Debezium 采集云数据库 PostgreSQL 中的数据。

前提条件

准备处于同一 VPC 下的云数据库 PostgreSQL 实例和云服务器实例。

步骤1:部署环境

1.云服务器配置 java 环境

Debezium 属于 java 应用,需要在云服务器配置 java 环境,为其正常运行提供基础。
依次执行下方命令,下载 jdk18 安装包并解压。
#下载jdk8
[root@VM-10-18-tencentos ~]# wget --no-check-certificate --header "Cookie: oraclelicense=accept-securebackup-cookie" \\
https://download.oracle.com/java/18/archive/jdk-18.0.2_linux-x64_bin.tar.gz
#解压安装包
[root@VM-10-18-tencentos ~]# tar -zxvf jdk-18.0.2_linux-x64_bin.tar.gz -C /usr/local/
 
#重命名目录
[root@VM-10-18-tencentos ~]# sudo mv /usr/local/jdk-18.0.2 /usr/local/jdk18
执行下列命令,进入配置文件内容。
[root@VM-10-18-tencentos ~]# vim /etc/profile
按 i 键进入编辑模式,在文件末尾添加以下内容:
export JAVA_HOME=/usr/local/jdk18
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib
添加完毕后,按 esc 键退出编辑模式,再输入 :wq 保存修改并退出文件内容。
执行以下命令使配置立即生效。
[root@VM-10-18-tencentos ~]# source /etc/profile
可通过以下命令检查 java 是否配置成功。
[root@VM-10-18-tencentos ~]# java -version
java version "18.0.2" 2022-07-19
Java(TM) SE Runtime Environment (build 18.0.2+9-61)
Java HotSpot(TM) 64-Bit Server VM (build 18.0.2+9-61, mixed mode, sharing)
如果显示 java 版本信息,说明配置成功。

2.本地 Kafka 部署

您可选择手动在官网 Apache Kafka 下载自己需要的版本的二进制包( Binary download ),然后上传到 CVM 上。具体请参见 Linux 系统通过 FTP 上传文件到云服务器
也可直接执行以下命令下载,版本号可根据实际需要自行替换。
[root@VM-10-18-tencentos ~]# wget https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
将 kafka 安装包下载到 CVM 之后,依次执行以下命令完成安装。
#创建kafka的安装目录
[root@VM-10-18-tencentos ~]# mkdir -p /data/zookeeper
#解压kafka安装包
[root@VM-10-18-tencentos ~]# tar -zxvf kafka_2.13-3.7.2.tgz -C /data/
 
#重命名解压后的目录
[root@VM-10-18-tencentos ~]# cd /data/
[root@VM-10-18-tencentos data]# mv kafka_2.13-3.7.2 kafka_dev
执行下行命令,进入 Zookeeper 配置文件。
root@VM-10-18-tencentos data]# cd /data/kafka_dev/config
[root@VM-10-18-tencentos config]# vim /data/kafka_dev/config/zookeeper.properties
进入文件内后,按i键进入编辑模式,找到 dataDir ,将其修改为 /data/zookeeper ,确保 dataDir 指向正确的存储路径。
dataDir =/data/zookeeper
修改完成后,按 esc 键退出编辑模式,再直接输入 :wq 保存修改并退出文件内容。

3.修改 Kafka 配置文件

执行以下命令,创建 kafka 日志目录。
[root@VM-10-18-tencentos config]# mkdir -p /data/kafka_dev/logs/
执行以下命令,进入 kafka 配置文件。
[root@VM-10-18-tencentos config]# vim connect-distributed.properties
进入文件后,按 i 键进入编辑模式,修改以下内容。若云服务器与云数据库处于同一 VPC 下,建议填写云服务器的内网 IP 地址。
listeners=PLAINTEXT://部署kafka所在机器的ip:9092    #若该项所在行首有#号,需将#删去并修改
log.dirs=/data/kafka_dev/logs/connect.log
zookeeper.connect=部署kafka所在机器的ip:2181
执行以下命令,进入 kafka connect 的配置文件 connect-distributed.properties 。
[root@VM-10-18-tencentos config]# vim connect-distributed.properties
进入文件后,按 i 键进入编辑模式,修改以下内容。若云服务器与云数据库处于同一 VPC 下,建议填写云服务器的内网 IP 地址。
group.id=connect-cluster
bootstrap.servers=部署kafka所在机器的ip:9092
# 定义插件路径
plugin.path=/data/kafka_connect/plugins

4.启动 Zookeeper 和 Kafka

使用以下命令启动 zookeeper 。
[root@VM-10-18-tencentos config]# cd /data/kafka_dev
[root@VM-10-18-tencentos kafka_dev]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &> zookeeper.log &
可输入以下命令确认 zookeeper 任务是否正常在后台运行。
[root@VM-10-18-tencentos kafka_dev]# jobs
若返回信息包含 zookeeper 和 running ,则正常运行。
再执行以下命令启动 kafka 。
[root@VM-10-18-tencentos kafka_dev]# nohup bin/kafka-server-start.sh config/server.properties &> kafka.log &
可输入以下命令确认 kafka 任务是否正常在后台运行。
[root@VM-10-18-tencentos kafka_dev]# jobs
若返回信息包含 kafka 和 running,则正常运行。

步骤2:在 PostgreSQL 中创建逻辑复制发布

逻辑发布( Publication )定义了哪些表的数据变更会被发布,Debezium 通过绑定到逻辑发布上的逻辑复制槽( Failover Slot )来捕获变更数据。对于逻辑复制槽的具体说明请参见 逻辑复制槽故障转移(Failover Slot)。因此,您需要创建 publication 和 failover slot ,才能实现数据的捕获和同步。

1.开启逻辑复制

进入控制台,找到需要采集数据的实例,在实例详情页面点击参数设置,将 wal_level 参数默认值修改为为 logical ,参数值修改后需要重启实例才能生效。

修改后,可登录实例,使用以下查询语句查看 wal_level 是否修改成功。
show wal_level;


2.创建 Publication (逻辑发布)

使用类型为 pg_tencentdb_superuser 的账号,登录需要发布的数据库控制台。执行以下命令创建 publication 。
CREATE PUBLICATION pg_demo_publication FOR ALL TABLES;
其中, pg_demo_publication 指该 publication 的名称,您可自行定义, FOR ALL TABLES 指将当前数据库中的全部表都进行发布。若您需要指定发布哪几张表,则可选择执行以下命令:
CREATE PUBLICATION pg_demo_publication FOR table_name1, table_name2;
您可执行以下命令查看刚刚创建的 publication ,确认要发布的表。
SELECT * FROM pg_publication_tables WHERE pubname = ‘pg_demo_publication’;
若希望确认有哪些操作会进行发布,可执行以下命令查看。
SELECT * FROM pg_publication WHERE pubname = ‘pg_demo_publication’;
pg_publication 表用于存储所有已创建的 publication 信息。其中, puballtables 列为 true ,则表示发布数据库中的所有表; pubinsert 列为 true ,表示发布表的 insert 操作,其他列相同。
执行以下命令,创建 tencentdb_failover_slot 插件。
CREATE EXTENSION tencentdb_failover_slot;
执行以下命令,创建 logical_failover_slot 。
SELECT pg_create_logical_failover_slot(‘failover_alot_name’,’pgoutput’);
创建完成后,可使用以下命令查看 Failover Slot 信息。
SELECT * FROM pg_failover_slots;

步骤3:开启 Debezium

1.安装 Debezium 插件

登录云服务器,依次执行以下命令下载 debezium-connector-postgresql 插件,并解压到指定路径。
[root@VM-10-18-tencentos ~]# mkdir -p /data/kafka_connect/plugins
[root@VM-10-18-tencentos ~]# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.7.3.Final/debezium-connector-postgres-2.7.3.Final-plugin.tar.gz
[root@VM-10-18-tencentos ~]# tar -zxvf debezium-connector-postgres-2.7.3.Final-plugin.tar.gz -C /data/kafka_connect/plugins

2.启动 kafka connect

启动 kafka connect 前,请确保已经启动了 kafka 。启动及确认方法请参见 步骤1
执行以下命令启动 kafka connect 。
[root@VM-10-18-tencentos ~]# cd /data/kafka_dev
[root@VM-10-18-tencentos kafka_dev]# nohup bin/connect-distributed.sh config/connect-distributed.properties &> connect.log &

3.创建 debezium connector

在云服务器执行下面的命令,进行 debezium connector 的创建。需要自行根据实际情况填写的项已为您标出。
说明:
若云服务器与云数据库 PostgreSQL 处于同一 VPC 下,建议填写云服务器机器、云数据库的内网 IP 。
curl -XPOST "http://部署kafka所在云服务器的ip:8083/connectors/" \\ 
-H 'Content-Type: application/json' \\ 
-d '{   
"name": "test_connector",   
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "PostgreSQL数据库所在机器的IP",
"database.port": "5432",
"database.user": "有发布权限的PostgreSQL用户名",
"database.password": "发布用户的密码",
"database.dbname": "创建publication的数据库",
"database.server.name": "pg_demo", 
"slot.name": "pg_demo_failover_slot",
"topic.prefix": "pg_demo", 
"publication.name": "pg_demo_publication",
"publication.autocreate.mode": "all_tables",
"plugin.name": "pgoutput"
}
}'
需要您自行填写及可自定义的参数说明如下:
参数
说明
name
连接器的名称,必须唯一。
database.hostname
云数据库的 IP 地址。建议您填写内网 IP。
database.user
用于连接云数据库的用户名。
该用户需要有足够权限完成发布。推荐使用类型为 pg_tecenten_superuser 的用户。
database.password
用户的密码。
database.dbname
创建 publication 的数据库名称。
slot.name
逻辑复制槽的名称。
请填写之前创建的逻辑复制槽名称。
publication.name
逻辑发布的名称。
请填写之前创建的逻辑发布名称。
您可登录控制台,使用以下命令查看 publication 。
SELECT * FROM pg_publication;
可使用以下命令查看 failover Slot 信息。
SELECT * FROM pg_failover_slots;
创建完毕后,您可通过在云服务器输入以下命令查看连接状态。
curl "http://部署kafka所在机器的ip:8083/connectors/pg_demo_connector/status"
返回的信息中包含 running 则为正常运行。

步骤4:测试数据变更

执行以下命令在 CVM 中登录云数据库。其中 -h 后填写为云数据库 IP。若云服务器与云数据库处于同一 VPC 下,建议填写为内网 IP。
[root@VM-10-18-tencentos kafka_dev]# su – postgres
[postgres@VM-10-18-tencentos ~]$ /usr/local/pgsql/bin/psql -h *.*.*.* -p 5432 -U dbadmin -d postgres
Password for user dbadmin:
psql (16.4, server 16.8)
Type "help" for help.
 
postgres=>
创建一张新表,插入数据进行测试。
postgres=> CREATE TABLE linktest (
    id SERIAL PRIMARY KEY
);
CREATE TABLE
postgres=> insert into linktest values(1);
INSERT 0 1
观察 kafka 日志,若 kafka 正常,则连接建立成功。