前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MySQL:基于Spring监听Binlog日志

MySQL:基于Spring监听Binlog日志

原创
作者头像
GoBoy
修改2023-11-17 08:12:13
1.1K2
修改2023-11-17 08:12:13
举报
文章被收录于专栏:Goboy

binlog的三种模式

MySQL 的二进制日志(binlog)有三种不同的格式,通常被称为 binlog 模式。这三种模式分别是 Statement 模式、Row 模式和Mixed 模式。

Statement 模式:

  • 在 Statement 模式下,MySQL 记录每个会更改数据的 SQL 语句。
  • binlog 记录的是执行的 SQL 语句本身,而不是具体的数据变化。
  • 例如,如果执行了 UPDATE 语句,binlog 记录的是这个 UPDATE 语句的文本。

Row 模式:

  • 在 Row 模式下,MySQL 记录每一行数据的变化。
  • binlog 记录的是行数据的变化,而不是 SQL 语句。
  • 例如,如果执行了 UPDATE 语句,binlog 记录的是被修改的行的实际数据。

Mixed 模式:

  • Mixed 模式是 Statement 模式和 Row 模式的结合。
  • 在 Mixed 模式下,MySQL 根据执行的 SQL 语句的类型来决定是记录语句还是记录行。
  • 通常,对于简单的语句,使用 Statement 模式,对于涉及到行变化的复杂语句,使用 Row 模式。

这些模式可以通过 MySQL 配置文件中的 binlog_format 参数进行配置。例如:

代码语言:javascript
复制
[mysqld]
binlog_format=mixed

其中,statementrowmixed 分别代表 Statement 模式、Row 模式和 Mixed 模式。选择适当的 binlog 模式取决于应用的特定需求和性能要求。不同的模式具有不同的优劣势,例如,Statement 模式可能会更轻量,而 Row 模式可能提供更详细的数据变化信息。

以Mixed 为例

查看binlog是否开启

代码语言:javascript
复制
show variables like '%log_bin%'

启动springboot程序

新建数据库

这个事件是一个 binlog 事件,其内容表示一个 SQL 查询事件。让我解释一下这个事件的各个部分:

  • 事件类型 (eventType): 该事件的类型是 QUERY,表示这是一个 SQL 查询事件。
  • 时间戳 (timestamp): 事件的时间戳为 1700045267000,表示事件发生的时间。
  • 线程ID (threadId): 线程ID 是 189,表示执行这个查询的线程的标识符。
  • 执行时间 (executionTime): 执行时间为 0,表示执行这个查询所花费的时间。
  • 错误代码 (errorCode): 错误代码为 0,表示查询执行没有错误。
  • 数据库 (database): 数据库为 test2023,表示这个查询发生在 test2023 数据库中。
  • SQL 查询 (sql): 实际的 SQL 查询为 CREATE DATABASE test2023 CHARACTER SET utf8 COLLATE utf8_general_ci,表示执行了创建数据库的操作。

这个事件的作用是在 test2023 数据库中执行了一个创建数据库的 SQL 查询。这是 binlog 中的一部分,用于记录数据库中的变化,以便进行数据备份、主从同步等操作。

新建表数据

代码语言:javascript
复制
CREATE TABLE `t_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `userName` varchar(100) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;

这个事件也是一个 binlog 事件,表示一个 SQL 查询事件。让我解释一下这个事件的各个部分:

  • 事件类型 (eventType): 该事件的类型是 QUERY,表示这是一个 SQL 查询事件。
  • 时间戳 (timestamp): 事件的时间戳为 1700045422000,表示事件发生的时间。
  • 线程ID (threadId): 线程ID 是 204,表示执行这个查询的线程的标识符。
  • 执行时间 (executionTime): 执行时间为 0,表示执行这个查询所花费的时间。
  • 错误代码 (errorCode): 错误代码为 0,表示查询执行没有错误。
  • 数据库 (database): 数据库为 test2023,表示这个查询发生在 test2023 数据库中。
  • SQL 查询 (sql): 实际的 SQL 查询为 CREATE TABLE t_user(idbigint(20) NOT NULL AUTO_INCREMENT,userName varchar(100) NOT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4,表示执行了在 test2023 数据库中创建名为 t_user 的表的操作。

这个事件的作用是在 test2023 数据库中创建了一个名为 t_user 的表,该表包含 iduserName 两个字段,其中 id 是自增的主键。这种类型的事件常常用于记录数据库结构的变化,以便进行数据备份、迁移和版本控制等操作。

插入表数据

代码语言:javascript
复制
INSERT INTO `test2023`.`t_user` (`id`, `userName`)
VALUES
    (
        "10086",
        "用心记录技术,走心分享,始于后端,不止于后端,励志成为一名优秀的全栈架构师,真正的实现码中致富。"
    );

这个事件也是一个 binlog 事件,表示一个 SQL 查询事件,具体如下:

  • 事件类型 (eventType): 该事件的类型是 QUERY,表示这是一个 SQL 查询事件。
  • 时间戳 (timestamp): 事件的时间戳为 1700045547000,表示事件发生的时间。
  • 线程ID (threadId): 线程ID 是 204,表示执行这个查询的线程的标识符。
  • 执行时间 (executionTime): 执行时间为 0,表示执行这个查询所花费的时间。
  • 错误代码 (errorCode): 错误代码为 0,表示查询执行没有错误。
  • 数据库 (database): 数据库为 test2023,表示这个查询发生在 test2023 数据库中。
  • SQL 查询 (sql): 实际的 SQL 查询为 INSERT INTO test2023.t_user (id, userName) VALUES ( "10086", "用心记录技术,走心分享,始于后端,不止于后端,励志成为一名优秀的全栈架构师,真正的实现码中致富。",表示执行了向 test2023 数据库的 t_user 表中插入一行数据的操作。

这个事件的作用是向 t_user 表中插入了一行数据,包含了 iduserName 两个字段的值。这种类型的事件通常用于记录数据的变化,以便进行数据备份、同步和迁移等操作。

修改表数据

代码语言:javascript
复制
UPDATE `test2023`.`t_user`
SET `id` = '10086',
 `userName` = '我的修改数据!!!'
WHERE
	(`id` = '10086');

这个事件同样是一个 binlog 事件,表示一个 SQL 查询事件,具体如下:

  • 事件类型 (eventType): 该事件的类型是 QUERY,表示这是一个 SQL 查询事件。
  • 时间戳 (timestamp): 事件的时间戳为 1700045675000,表示事件发生的时间。
  • 线程ID (threadId): 线程ID 是 204,表示执行这个查询的线程的标识符。
  • 执行时间 (executionTime): 执行时间为 0,表示执行这个查询所花费的时间。
  • 错误代码 (errorCode): 错误代码为 0,表示查询执行没有错误。
  • 数据库 (database): 数据库为 test2023,表示这个查询发生在 test2023 数据库中。
  • SQL 查询 (sql): 实际的 SQL 查询为 UPDATE test2023.t_userSETid= '10086', userName = '我的修改数据!!!' WHERE (id = '10086'),表示执行了更新 test2023 数据库中的 t_user 表中一行数据的操作。

这个事件的作用是将 t_user 表中 id10086 的行的数据进行更新,将 id 修改为 10086userName 修改为 '我的修改数据!!!'。这种类型的事件通常用于记录数据的变化,以便进行数据备份、同步和迁移等操作。

删除表数据

代码语言:javascript
复制
DELETE
FROM
	t_user
WHERE
	id = '10086';

这个事件同样是一个 binlog 事件,表示一个 SQL 查询事件,具体如下:

  • 事件类型 (eventType): 该事件的类型是 QUERY,表示这是一个 SQL 查询事件。
  • 时间戳 (timestamp): 事件的时间戳为 1700045755000,表示事件发生的时间。
  • 线程ID (threadId): 线程ID 是 204,表示执行这个查询的线程的标识符。
  • 执行时间 (executionTime): 执行时间为 0,表示执行这个查询所花费的时间。
  • 错误代码 (errorCode): 错误代码为 0,表示查询执行没有错误。
  • 数据库 (database): 数据库为 test2023,表示这个查询发生在 test2023 数据库中。
  • SQL 查询 (sql): 实际的 SQL 查询为 DELETE FROM t_user WHERE id = '10086',表示执行了删除 test2023 数据库中的 t_user 表中一行数据的操作。

这个事件的作用是删除 t_user 表中 id10086 的行。这种类型的事件通常用于记录数据的删除操作,以便进行数据备份、同步和迁移等操作。

总结: binlog_format 设置为 mixed 时,对于 INSERT、UPDATE 和 DELETE 操作,它们在 binlog 中的事件类型都会被表示为 QUERY 事件。这是因为在 mixed 模式下,MySQL 使用了不同的方式来记录不同类型的操作,但在 binlog 中,它们都被包装成了 QUERY 事件。

在 mixed 模式下:

  • 对于某些语句级别的操作(例如非确定性的语句或不支持事务的存储引擎),会使用 STATEMENT 事件。
  • 对于其他一些情况,会使用 ROW 事件,将变更的行作为事件的一部分进行记录。

这就是为什么看到的 INSERT、UPDATE 和 DELETE 操作的事件类型都是 QUERY。在处理这些事件时,需要根据具体的 SQL 查询语句或其他信息来确定操作的类型。

源码示例

pom.xml

代码语言:html
复制
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
		
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.48</version> <!-- 查看最新版本 -->
		</dependency>
			
		<dependency>
			<groupId>com.github.shyiko</groupId>
			<artifactId>mysql-binlog-connector-java</artifactId>
			<version>0.21.0</version>
		</dependency>

Java示例

代码语言:javascript
复制
package com.example.demo.listener;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.naming.AuthenticationException;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeoutException;

public class BinlogListenerMixed {

    private static final Logger logger = LoggerFactory.getLogger(BinlogListenerMixed.class);

    private static final String MYSQL_HOST = "8.130.74.105";
    private static final int MYSQL_PORT = 3306;
    private static final String MYSQL_USERNAME = "root";
    private static final String MYSQL_PASSWORD = "zhang.ting.123";

    public static void main(String[] args) {
        try {
            BinaryLogClient client = new BinaryLogClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
//            client.setBinlogFilename(null);
//            client.setBinlogPosition(-1); // 或者设置为其他适当的初始位置
//            client.setServerId(1);
//            client.setBinlogFilename("mysql-bin.000005");
//            client.setBinlogPosition(154);
            EventDeserializer eventDeserializer = new EventDeserializer();
            eventDeserializer.setCompatibilityMode(
                    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
            );
            logger.info("使用主机={}, 端口={}, 用户名={}, 密码={} 连接到 MySQL", MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
            client.setEventDeserializer(eventDeserializer);
            client.registerEventListener(BinlogListenerMixed::handleEvent);
            client.registerLifecycleListener(new BinaryLogClient.LifecycleListener() {
                @Override
                public void onConnect(BinaryLogClient client) {
                    logger.info("Connected to MySQL server");
                }

                @Override
                public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
                    logger.error("Communication failure with MySQL server", ex);
                }

                @Override
                public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
                    logger.error("Event deserialization failure", ex);
                }

                @Override
                public void onDisconnect(BinaryLogClient client) {
                    logger.warn("Disconnected from MySQL server");
                    // 在这里添加重新连接或其他处理逻辑
                }
            });

            client.connect();
        } catch (IOException e) {
            logger.error("@@ 连接到 MySQL 时发生错误", e);
            logger.error("@@ Error connecting to MySQL", e);
        }
    }

    private static void handleEvent(Event event) {
        logger.info("@@ 打印 event: {}", event);
        logger.info("@@ Received event type: {}", event.getHeader().getEventType());

        switch (event.getHeader().getEventType()) {
            case WRITE_ROWS:
            case EXT_WRITE_ROWS:
                handleWriteRowsEvent((WriteRowsEventData) event.getData());
                break;
            case QUERY:
                handleQueryEvent((QueryEventData) event.getData());
                break;
            case TABLE_MAP:
                handleTableMapEvent((TableMapEventData) event.getData());
                break;
            // 其他事件处理...
        }
    }

    private static void handleWriteRowsEvent(WriteRowsEventData eventData) {
        List<Serializable[]> rows = eventData.getRows();

        // 获取表名
        String tableName = getTableName(eventData);

        // 处理每一行数据
        for (Serializable[] row : rows) {
            // 根据需要调整以下代码以获取具体的列值
            String column1Value = row[0].toString();
            String column2Value = row[1].toString();

            // 将数据备份到另一个数据库
            backupToAnotherDatabase(tableName, column1Value, column2Value);
        }
    }

    private static void handleQueryEvent(QueryEventData eventData) {
        String sql = eventData.getSql();
        logger.info("@@ handleQueryEvent函数执行Query event SQL: {}", sql);

        // 解析SQL语句,根据需要处理
        // 例如,检查是否包含写入操作,然后执行相应的逻辑
    }

    private static void handleTableMapEvent(TableMapEventData eventData) {
        // 获取表映射信息,根据需要处理
        logger.info("@@ handleTableMapEvent函数执行TableMap event: {}", eventData);
    }

    private static String getTableName(EventData eventData) {
        // 获取表名的逻辑,可以使用TableMapEventData等信息
        // 根据实际情况实现
        return "example_table";
    }

    private static void backupToAnotherDatabase(String tableName, String column1Value, String column2Value) {
        // 将数据备份到另一个数据库的逻辑
        logger.info("Backup to another database: Table={}, Column1={}, Column2={}", tableName, column1Value, column2Value);
    }
}

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • binlog的三种模式
  • 以Mixed 为例
    • 查看binlog是否开启
      • 启动springboot程序
        • 新建数据库
          • 新建表数据
            • 插入表数据
              • 修改表数据
                • 删除表数据
                • 源码示例
                  • pom.xml
                    • Java示例
                    相关产品与服务
                    云数据库 MySQL
                    腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档