MySQL 数据订阅

最近更新时间:2022-11-25 15:39:00

我的收藏

简介

MySQL 通过一种二进制日志(binlog)来按序记录所有提交给数据库的操作,包括对表结构的修改以及对表中数据的修改。MySQL 通过 binlog 来进行备份或恢复数据。
Debezium MySQL connector 通过读取 binlog 来生成行级(row-level)的数据库修改事件(event),包括 INSERT、UPDATE 和 DELETE,并将事件发送给 kafka 中相应的 topic 。客户端应用可以通过消费对应 topic中的消息来对数据库修改事件进行处理,从而达到监控特定数据库的目的。
支持订阅的 SQL 操作:
操作类型
支持的SQL操作
DML
INSERT、UPDATE、DELETE
DDL
CREATE DATABASE、DROP DATABASE、CREATE TABLE、ALTER TABLE、DROP TABLE、RENAME TABLE

本文档是根据 Debezium 官方文档进行整理和归纳而来。详情参见 Debezium connector for MySQL

事件格式

Debezium MySQL connector 针对每一次插入、更新、删除操作生成数据变更事件。每一个事件(event)在作为消息提交给 kafka 的主题(Topic),Topic里每条消息包含 key 和 value 两部分,示例如下:



Kafka 每条消息的key 和 value 都包含 schema 和 payload 两个字段。格式如下:
{
"schema": {
...
},
"payload": {
...
}
}
key 字段说明:
Item
Field name
Description
1
schema
schema 字段描述了 key 的 payload 字段的结构,即它描述了被修改的表的主键(primary key)结构,如果表没有主键,则描述其唯一约束(unique key)的结构。
2
payload
payload 字段的结构和第一个 schema 中描述的相同,包含了被修改的行的键值。

value 字段说明:
Item
Field name
Description
1
schema
schema 字段描述了 value 的payload 字段的结构,即描述了被修改行的字段结构。这个字段通常是一个嵌套结构的字段。
2
payload
payload 字段的结构和第二个 schema 中定义的相同,它包含被修改行的真实数据。

事件消息 key

不同类型事件的消息都有一样的 key 结构,下面给出一个示例,一个修改事件的 key 包含被修改的表的主键结构以及对应行的实际主键值。
CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
每一个捕获 customers 表修改操作的事件 key 中的 schema 都相同。该操作对应的事件消息的 key 如下所示(JSON表示):
{
"schema": {
"type": "struct",
"name": "mysql-server-1.inventory.customers.Key",
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1001
}
}
Item
Field name
Description
1
schema
Schema 描述了 payload 中的结构。
2
mysql-server-1.inventory.customers.Key
schema 的名称格式为 *connector-name*.*database-name*.*table-name*.Key。在这个例子中: mysql-server-1 是生成事件的connector的名字。inventory 是对应数据库的名字。 customers 是表的名字。
3
optional
表示字段是否是可选项。
4
fields
列出了所有 payload 中包含的字段及其结构, 包括字段名、字段类型、以及是否可选。
5
payload
包含被修改行的主键。在例子中仅包含一个字段名为 id 的主键值: 1001。

DML 事件

前面介绍了一个事件消息的 key 的结构,不同类型事件的 key 结构是相同的。本节列举了不同的事件类型,介绍了这些事件类型各自的 value 结构。

新增事件(create events)

下面这个例子展示了在表中新增数据的时候 connector 生成的事件消息的 value 部分:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "mysql-server-1.inventory.customers.Envelope"
},
"payload": {
"op": "c",
"ts_ms": 1465491411815,
"before": null,
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "1.9.3.Final",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 0,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": 7,
"query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
}
}
}
Item
Field name
Description
1
schema
Schema 描述了 payload 中的结构, 其中 schema 中的 fields 字段为一个数组,表示 payload 字段包含了多个字段,数组的每个元素是对 payload 中相应字段结构的描述信息。
2
field
每一个 fields 中的元素都包含一个 field 字段,该字段表示 payload 中对应字段的名称。在示例中包括 before、after、source等。
3
type
表示字段的类型,如整型(int)、字符串(string)等.
4
mysql-server-1.inventory.customers.Value
表示该字段是 mysql-server-1 连接器生成的针对 inventory 数据库的 customers 表的 value 部分信息。

io.debezium.connector.mysql.Source
该名称和特定的 connector 绑定,由该 connector 生成的事件该名称都相同。
6
payload
包含修改事件中具体被修改的数据,包括修改前(before 字段)和修改后(after 字段)的数据,以及一些 connector 的元数据信息(source 字段)。
7
op
表示导致事件生成的修改操作的类型,例子中的 c 表示 修改操作创建了一个新的行。c = createu = updated = deleter = read (仅 snapshots)
8
source
source 字段是一个描述事件元数据的字段。它包含的一些字段可以用来与其他事件做比较,如比较事件生成的顺序、事件是否属于同一个事务等。该字段包含以下元数据信息: Debezium versionConnector namebinlog name where the event was recordedbinlog positionRow within the eventIf the event was part of a snapshotName of the database and table that contain the new rowID of the MySQL thread that created the event (non-snapshot only)MySQL server ID (if available)Timestamp for when the change was made in the database
9
query
修改操作的原始 SQL 语句。

更新事件(update events)

下面这个例子展示了更新操作生成的事件的 value 部分:
{
"schema": { ... },
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "1.9.3.Final",
"name": "mysql-server-1",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 1465581029100,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 484,
"row": 0,
"thread": 7,
"query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
},
"op": "u",
"ts_ms": 1465581029523
}
}
其中 schema 字段和新增操作的事件相同,而 payload 部分有所不同,在新增事件中,before 字段为 null,表示没有原始数据,而更新事件中包含了更新前(before)和更新后(after)的数据。

删除事件(delete events)

下面这个例子展示了删除操作生成的事件的 value 部分:
{
"schema": { ... },
"payload": {
"before": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null,
"source": {
"version": "1.9.3.Final",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 1465581902300,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 805,
"row": 0,
"thread": 7,
"query": "DELETE FROM customers WHERE id=1004"
},
"op": "d",
"ts_ms": 1465581902461
}
}
其中 schema 字段和新增操作的事件相同,而 payload 部分有所不同,删除事件中包含了更新前(before)的数据,但更新后(after)的数据为 null,表示数据已删除。

更新主键(primary key updates)

如果一个操作修改了数据表中某行的主键,那么 connector 会生成一条删除事件来表示原主键对应的数据行删除,同时生成一条新增事件来表示插入的新主键对应的行。每一条消息的 header 都会和相应的 key 关联。官方描述如下:
The DELETE event record has __debezium.newkey as a message header. The value of this header is the new primary key for the updated row.
The CREATE event record has __debezium.oldkey as a message header. The value of this header is the previous (old) primary key that the updated row had.

DDL 事件

创建数据库(create database)

下面这个例子展示了创建数据库操作生成的事件的 value 部分:
{
"source" : {
"server" : "dip_source"
},
"position" : {
"ts_sec" : 1655812326,
"file" : "mysql-bin.000006",
"pos" : 26063,
"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
"snapshot" : true
},
"databaseName" : "dip_test",
"ddl" : "CREATE DATABASE `dip_test` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci",
"tableChanges" : [ ]
}
其中 position 的内容为记录 binlog 文件,消费偏移量等信息。ddl 字段为触发事件的 sql 语句。

删除数据库(drop database)

下面这个例子展示了删除数据库操作生成的事件的 value 部分:
{
"source" : {
"server" : "dip_source"
},
"position" : {
"ts_sec" : 1655812326,
"file" : "mysql-bin.000006",
"pos" : 26063,
"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
"snapshot" : true
},
"databaseName" : "dip_test",
"ddl" : "DROP DATABASE IF EXISTS `dip_test`",
"tableChanges" : [ ]
}
其中 position 的内容为记录 binlog 文件,消费偏移量等信息。ddl 字段为触发事件的 sql 语句。

创建表(create table)

下面这个例子展示了创建表操作生成的事件的 value 部分:
{
"source" : {
"server" : "dip_source"
},
"position" : {
"ts_sec" : 1655812326,
"file" : "mysql-bin.000006",
"pos" : 26063,
"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
"snapshot" : true
},
"databaseName" : "dip_test",
"ddl" : "CREATE TABLE `customers` (\\n `id` int NOT NULL AUTO_INCREMENT,\\n `first_name` varchar(255) NOT NULL,\\n `last_name` varchar(255) NOT NULL,\\n `email` varchar(255) NOT NULL,\\n PRIMARY KEY (`id`),\\n UNIQUE KEY `email` (`email`),\\n KEY `ix_id` (`id`)\\n) ENGINE=InnoDB AUTO_INCREMENT=1041 DEFAULT CHARSET=utf8",
"tableChanges" : [ {
"type" : "CREATE",
"id" : "\\"dip_test\\".\\"customers\\"",
"table" : {
"defaultCharsetName" : "utf8",
"primaryKeyColumnNames" : [ "id" ],
"columns" : [ {
"name" : "id",
"jdbcType" : 4,
"typeName" : "INT",
"typeExpression" : "INT",
"charsetName" : null,
"position" : 1,
"optional" : false,
"autoIncremented" : true,
"generated" : true,
"comment" : null,
"hasDefaultValue" : false,
"enumValues" : [ ]
}, {
"name" : "first_name",
"jdbcType" : 12,
"typeName" : "VARCHAR",
"typeExpression" : "VARCHAR",
"charsetName" : "utf8",
"length" : 255,
"position" : 2,
"optional" : false,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : false,
"enumValues" : [ ]
}, {
"name" : "last_name",
"jdbcType" : 12,
"typeName" : "VARCHAR",
"typeExpression" : "VARCHAR",
"charsetName" : "utf8",
"length" : 255,
"position" : 3,
"optional" : false,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : false,
"enumValues" : [ ]
}, {
"name" : "email",
"jdbcType" : 12,
"typeName" : "VARCHAR",
"typeExpression" : "VARCHAR",
"charsetName" : "utf8",
"length" : 255,
"position" : 4,
"optional" : false,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : false,
"enumValues" : [ ]
} ]
},
"comment" : null
} ]
}
其中 position 的内容为记录 binlog 文件,消费偏移量等信息。ddl 字段为触发事件的 sql 语句。columns 字段记录了新增表的不同字段的定义信息。

修改表(alter table)

下面这个例子展示了修改表操作生成的事件的 value 部分:
{
"source" : {
"server" : "1307446078-a123"
},
"position" : {
"transaction_id" : null,
"ts_sec" : 1655782153,
"file" : "mysql-bin.000005",
"pos" : 1218,
"gtids" : "ddf040ad-7509-11ec-968b-0c42a1eda2e9:1-8",
"server_id" : 183277
},
"databaseName" : "test",
"ddl" : "ALTER TABLE `user` ADD COLUMN `createtime` datetime NULL DEFAULT CURRENT_TIMESTAMP",
"tableChanges" : [ {
"type" : "ALTER",
"id" : "\\"test\\".\\"user\\"",
"table" : {
"defaultCharsetName" : "utf8",
"primaryKeyColumnNames" : [ ],
"columns" : [ {
"name" : "name",
"jdbcType" : 1,
"typeName" : "CHAR",
"typeExpression" : "CHAR",
"charsetName" : "utf8",
"length" : 20,
"position" : 1,
"optional" : true,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : true,
"defaultValueExpression" : "",
"enumValues" : [ ]
}, {
"name" : "age",
"jdbcType" : 4,
"typeName" : "INT",
"typeExpression" : "INT",
"charsetName" : null,
"position" : 2,
"optional" : true,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : true,
"enumValues" : [ ]
}, {
"name" : "createtime",
"jdbcType" : 93,
"typeName" : "DATETIME",
"typeExpression" : "DATETIME",
"charsetName" : null,
"position" : 3,
"optional" : true,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : true,
"defaultValueExpression" : "1970-01-01 00:00:00",
"enumValues" : [ ]
} ]
},
"comment" : null
} ]
}
其中 position 的内容为记录 binlog 文件,消费偏移量等信息。ddl 字段为触发事件的 sql 语句。columns 字段记录了被修改的字段的信息。

删除表(drop table)

下面这个例子展示了删除表操作生成的事件的 value 部分:
{
"source" : {
"server" : "dip_source"
},
"position" : {
"ts_sec" : 1655812326,
"file" : "mysql-bin.000006",
"pos" : 26063,
"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
"snapshot" : true
},
"databaseName" : "dip_test",
"ddl" : "DROP TABLE IF EXISTS `dip_test`.`customers`",
"tableChanges" : [ ]
}
其中 position 的内容为记录 binlog 文件,消费偏移量等信息。ddl 字段为触发事件的 sql 语句。

更改表名

下面这个例子展示了更改操作生成的事件的 value 部分:
{
"schema": {
"type": "struct",
"fields": ···,
"optional": false,
"name": "io.debezium.connector.mysql.SchemaChangeValue"
},
"payload": {
"source": {
"version": "1.9.0.Final",
"connector": "mysql",
"name": "task-lzpx4pdo",
"ts_ms": 1656300979748,
"snapshot": "false",
"db": "testDB",
"sequence": null,
"table": "t_test",
"server_id": 170993,
"gtid": "b24176f2-5409-11ec-80d4-b8599fe5c6ea:80",
"file": "mysql-bin.000006",
"pos": 26411,
"row": 0,
"thread": null,
"query": null
},
"databaseName": "testDB",
"schemaName": null,
"ddl": "rename table test to t_test",
"tableChanges": [{
"type": "ALTER",
"id": "\\"testDB\\".\\"t_test\\"",
"table": {
"defaultCharsetName": "utf8",
"primaryKeyColumnNames": ["id"],
"columns": [{
"name": "id",
"jdbcType": -5,
"nativeType": null,
"typeName": "BIGINT",
"typeExpression": "BIGINT",
"charsetName": null,
"length": 20,
"scale": null,
"position": 1,
"optional": false,
"autoIncremented": true,
"generated": true,
"comment": null
}, {
"name": "name",
"jdbcType": 12,
"nativeType": null,
"typeName": "VARCHAR",
"typeExpression": "VARCHAR",
"charsetName": "utf8",
"length": 20,
"scale": null,
"position": 2,
"optional": true,
"autoIncremented": false,
"generated": false,
"comment": null
}],
"comment": null
}
}]
}
}
其中 schema 中包含的是对 payload 的内容格式信息,这里省略了部分内容,payload 字段 source 为元数据信息,ddl 字段为触发事件的 sql 语句。columns 为受影响的表的字段。