概述
使用 CKafka 连接器订阅 MySQL 的变更操作时,可选多种消息格式,默认采用 debezium 格式,同时提供了兼容其他消息格式的能力。本文介绍兼容 官方自定义格式 的消息格式说明。
“官方格式一”说明
“官方格式一”目前仅支持 DML 消息,DDL 消息格式与 canal 格式一致。
字段名称 | 字段说明 |
BINLOG_NAME | binlog 日志文件名称 |
BINLOG_POS | binlog 日志的 pos 位置 |
DATABASE | 数据库名称 |
EVENT_SERVER_ID | 暂时默认为 null |
GLOBAL_ID | 如果开启 GTID,则为 GTID 信息 |
GROUP_ID | 暂时默认为 null |
NEW_VALUES | type = U,则为更新后的行信息,格式为 json type = D,则为 null type = I,则为新插入的行信息,格式为 json |
OLD_VALUES | type = U,则为更新前的行信息,格式为 json type = D,则为删除的行信息,格式为 json type = I,则为 null |
TABLE | 表名 |
TIME | 日志生成时间 |
TYPE | 日志类型: U:update D:delete l:insert |
DDL 格式
create database
{"data": null,"database": "dip_test","es": 1655812326,"id": 0,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "CREATE DATABASE `dip_test` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci","sqlType": null,"table": "","ts": 1655812326,"type": "QUERY"}
drop database
{"data": null,"database": "dip_test","es": 1655812326,"id": 0,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "DROP DATABASE IF EXISTS `dip_test`","sqlType": null,"table": "","ts": 1655812326,"type": "QUERY"}
create table
{"data": null,"database": "dip_test","es": 1655812326,"id": 0,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "CREATE TABLE `customers` (`id` int NOT NULL AUTO_INCREMENT,`first_name` varchar(255) NOT NULL,`last_name` varchar(255) NOT NULL,`email` varchar(255) NOT NULL,PRIMARY KEY (`id`),UNIQUE KEY `email` (`email`),KEY `ix_id` (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1041 DEFAULT CHARSET=utf8","sqlType": null,"table": "customers","ts": 1655812326,"type": "CREATE"}
alter table
{"data": null,"database": "test","es": 1655782153,"id": 0,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "ALTER TABLE `user` ADD COLUMN `createtime` datetime NULL DEFAULT CURRENT_TIMESTAMP","sqlType": null,"table": "user","ts": 1655782153,"type": "ALTER"}
drop table
{"data": null,"database": "dip_test","es": 1655812326,"id": 0,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "DROP TABLE IF EXISTS `dip_test`.`customers`","sqlType": null,"table": "customers","ts": 1655812326,"type": "ERASE"}
rename table
{"data": null,"database": "testDB","es": 1656300979748,"id": 0,"isDdl": true,"mysqlType": null,"old": null,"pkNames": null,"sql": "rename table test to t_test","sqlType": null,"table": "t_test","ts": 1656300979748,"type": "RENAME"}
DML 格式
insert
{"BINLOG_NAME": "mysql-bin.000003","BINLOG_POS": 154,"DATABASE": "inventory","EVENT_SERVER_ID": null,"GLOBAL_ID": null,"GROUP_ID": null,"NEW_VALUES": {"last_name": "Kretchmar","id": "1004","first_name": "Anne","email": "annek@noanswer.org"},"OLD_VALUES": null,"TABLE": "customers","TIME": "19700101080000","TYPE": "I"}
update
{"BINLOG_NAME": "mysql-bin.000003","BINLOG_POS": 484,"DATABASE": "inventory","EVENT_SERVER_ID": null,"GLOBAL_ID": null,"GROUP_ID": null,"NEW_VALUES": {"last_name": "Kretchmar","id": "1004","first_name": "Anne Marie","email": "annek@noanswer.org"},"OLD_VALUES": {"last_name": "Kretchmar","id": "1004","first_name": "Anne","email": "annek@noanswer.org"},"TABLE": "customers","TIME": "20160611015029","TYPE": "U"}
delete
{"BINLOG_NAME": "mysql-bin.000003","BINLOG_POS": 805,"DATABASE": "inventory","EVENT_SERVER_ID": null,"GLOBAL_ID": null,"GROUP_ID": null,"NEW_VALUES": null,"OLD_VALUES": {"last_name": "Kretchmar","id": "1004","first_name": "Anne Marie","email": "annek@noanswer.org"},"TABLE": "customers","TIME": "20160611020502","TYPE": "D"}