前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据中间件如何与MySQL数据同步?

数据中间件如何与MySQL数据同步?

作者头像
GreatSQL社区
发布2023-02-24 10:02:09
1.3K0
发布2023-02-24 10:02:09
举报
文章被收录于专栏:GreatSQL出品技术文章

* GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。

  • 1.引入
  • 2.传统方案介绍
  • 3.监控binlog实现"同步"更新
  • 4.总结

1.引入

先前介绍了ElasticSearch,以及ES配合MySQL的问题,这种方案是让ES上的数据根据MySQL的数据做对照从而形成对应的索引,再将数据通过处理和封装存放在ES当中。(可回顾:技术分析 | 浅析MySQL与ElasticSearch的组合使用)回到生产环境,我们如何保证MySQL中与ES对照的数据发生更新的时候ES也进行更新呢?就以ES为例。

2.传统方案介绍

2.1直接的"同步"更新

第一种方式十分直接,当发生对MySQL数据更新操作时,由服务器对MySQL和ES同时进行更新操作,如图:

这种方式实现起来十分“简单粗暴”,容易理解。这种实现方法显然可以解决这个问题,但绝不是最优解,原因如下:

  • 首先,这种方法使得我们进行数据库的数据写入、修改、删除等操作,后面都要跟上ES的同步操作,代码书写也过于冗长,且大大加大了业务的耦合度。
  • 其次,这种方法不能很好解决“同步”的问题,如果在执行对应操作的时候,发生了断电等情况,就有可能导致数据不同步的问题。
  • 最后,为了保证两者的更新要么同时完成要么都不完成,需要开启事务来处理,系统的性能有所降低,同时,在高并发情况下,有可能造成服务的“雪崩”。
2.2异步的"同步"更新

针对前面的方案,可以考虑加入消息队列的中间件来优化,与第一种方法不同的是当发生对MySQL数据更新操作时,服务器会完成MySQL数据的更新,并通过MQ的队列通过设置好的交换机发送更新ES的消息给对应的接收更新消息的队列,然后完成对应ES数据更新的实现。如图:

这种方案将直接的更新方式转换为异步的更新方式,性能上显然提高了,同时降低了业务耦合度,也优化了数据“同步”的问题。但是,这种方案会出现MQ的消费者在消费时可能会因为网络等原因导致用户数据有延时。同时,从编码角度上看,每次系统要进行同步的时候都要编写MQ代码,仍然存在业务的耦合,同时系统架构的设计也因为加入新的中间件要重新考虑维护的问题。

3.监控binlog实现"同步"更新

上面两种方案中都存在硬编码问题,同时存在强的业务耦合,以至于实现MySQL数据更新后的数据同步问题的代价要么是植入ES更新代码,要么替换为MQ代码,代码的侵入性太强,且性能降低。因此可以通过监控MySQL的binlog来实现数据的同步。

3.1问题分析

binlog,该日志存在于Server层次中,是使用存储引擎都可以使用的日志模块,binlog是逻辑日志,记录的是这个语句的原始逻辑,比如“给test表id=5这一行的col1字段值加1”。binlog的日志文件是可以追加写入的。“追加写入”是指binlog日志文件写到一定大小后会切换到下一个文件进行写入,可以设置sync_binlog为1,让每次事务的binlog都持久化保存到磁盘中。binlog在ROW模式下会记录每次操作后每行记录的变化。虽然此模式下所占用的空间较大,但此模式可以保持数据的一致性。因此不管SQL是什么,引用了什么函数,他记录的是执行后的效果。

3.2使用Canal来监控binlog

Canal是阿里用Java开发的基于数据库增量的日志解析,是提供增量数据订阅&消费的中间件。目前,Canal主要支持了 MySQLbinlog 解析,解析后可利用 Canal Client 来处理获得的相关数据。

详细可参考:https://github.com/alibaba/canal/wiki

Canal的实现原理基于MySQL主从复制进行设计:

  • Master主库将改变记录到逻辑日志(binary log)中(这些记录叫做逻辑日志事件,binary log events,可以通过show binlog events进行查看);
  • Slave从库将Master主库的binary log events拷贝到它的中继日志(relay log);
  • Slave从库读取从重做中继日志中的事件,将改变反映它自己的数据同步到数据库中。

而Canal就是将自身伪装成一个Slave从库,假装从Master主库复制数据:

  • Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Master发送dump协议;
  • MySQL Master收到dump请求,开始推送binary log给Slave(也就是Canal);
  • Canal解析binary log对象(原始为byte流)。

这种方案的好处是程序中没有代码侵入、没有硬编码。同时,原有系统不需要任何变化对原方案的高耦合进行了业务解耦,不需要关注原来系统的业务逻辑。

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

代码语言:javascript
复制
[mysqld]
# 开启 binlog
log-bin=mysql-bin 
# 选择 ROW 模式
binlog-format=ROW 
# 指定开启binlog的数据库,不指定则全部数据库开启
binlog-do-db=databasename
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1 

创建canal账户,授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限

代码语言:javascript
复制
[mysqld]
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

下载并启动Canal

代码语言:javascript
复制
wget https://github.com/alibaba/canal/releases/download/canal-1.1.2/canal.deployer-1.1.2.tar.gz

mkdir /tmp/canal
tar zxvf canal.deployer-1.1.2.tar.gz  -C /tmp/canal

修改Canal的配置文件

代码语言:javascript
复制
vi conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

Canal操作

代码语言:javascript
复制
# 启动
sh bin/startup.sh
# 查看server日志
vi logs/canal/canal.log</pre>
# 查看 instance 的日志
vi logs/example/example.log
# 关闭
sh bin/stop.sh

以Java为例,创建测试项目Maven工程,导入应用开发场景:

代码语言:javascript
复制
    <dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
    </dependencies>

编写日志监视类CanalClient来从日志中抓取信息,首先,获取canal的连接对象并连接:

代码语言:javascript
复制
//获取 canal 连接对象
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),"example", "canal","canal");
//连接
canalConnector.connect();

指定需要监控的数据库,并根据数据量来获取 Message :

代码语言:javascript
复制
//指定要监控的数据库
canalConnector.subscribe("databasename.*");
//获取 Message
Message message = canalConnector.get(100);

接着就可以通过处理 Message 来得到监控信息内容了:

代码语言:javascript
复制
List<CanalEntry.Entry> entries = message.getEntries();
    if (entries.size() > 0) {
    for (CanalEntry.Entry entry : entries) {
        //获取表名
        String tableName = entry.getHeader().getTableName();
        //Entry 类型
        CanalEntry.EntryType entryType = entry.getEntryType();
        //判断 entryType 是否为 ROWDATA
        if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
            //序列化数据
            ByteString storeValue = entry.getStoreValue();
            //反序列化
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
            //获取事件类型
            CanalEntry.EventType eventType = rowChange.getEventType();
            //获取具体的数据
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            //遍历并打印数据
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                JSONObject beforeData = new JSONObject();
                for (CanalEntry.Column column : beforeColumnsList) {
                    beforeData.put(column.getName(), column.getValue());
                }
                JSONObject afterData = new JSONObject();
                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                for (CanalEntry.Column column : afterColumnsList) {
                    afterData.put(column.getName(), column.getValue());
                }
                System.out.println("TableName:" + tableName
                        +
                        ",EventType:" + eventType +
                        ",Before:" + beforeData +
                        ",After:" + afterData);
            }
        }
    }
}

从代码中可以看出,当系统与Canal建立连接后可以获取Message来监控数据库的操作,Message是一次Canal从MySQL的 bin log 中抓取的信息,一个Message中可以有多个SQL执行的结果,每个SQL执行结果(SQL命令)称为Entry,如图:

Entry中包含 TableName 、 EntryType 和 StoreValue ,其中 StoreValue 包含了数据变化的内容。如下:

要想进行使用还需要进行反序列化操作才可以进行使用,如下:

当然,实际生产环境Canal可以配置MQ模式,配合RocketMQ或者Kafka,canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理。首先需要修改canal.properties文件,这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的输出 model,默认 tcp,改为输出到 kafka,instance.properties文件输出得到主题为kafka,可配置集群,再次启动canal就可以启动 Kafka 消费客户端测试,查看消费情况了。

4.总结

本文介绍了三种方式使得中间件的数据与MySQL的数据保存同步,前两种方法在使用性能和设计上都存在较大漏洞,而第三种通过读取MySQL的bin log日志,获取指定表的日志信息来实现数据同步的方法,在编码上看没有代码侵入,业务耦合度低,且原有系统不需要任何变化,但,构建bin log监控系统需要做好规划,不多赘述了。

Enjoy GreatSQL :)


点击小程序留言


《深入浅出MGR》视频课程

戳此小程序即可直达B站

https://www.bilibili.com/medialist/play/1363850082?business=space_collection&business_id=343928&desc=0


文章推荐:


关于 GreatSQL

GreatSQL是由万里数据库维护的MySQL分支,专注于提升MGR可靠性及性能,支持InnoDB并行查询特性,是适用于金融级应用的MySQL分支版本。

Gitee: https://gitee.com/GreatSQL/GreatSQL

GitHub: https://github.com/GreatSQL/GreatSQL

Bilibili:

https://space.bilibili.com/1363850082/video

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 GreatSQL社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.引入
  • 2.传统方案介绍
    • 2.1直接的"同步"更新
      • 2.2异步的"同步"更新
      • 3.监控binlog实现"同步"更新
        • 3.1问题分析
          • 3.2使用Canal来监控binlog
          • 4.总结
          相关产品与服务
          数据库
          云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档