首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

数据同步之 Canal mysql cdc 入门实战笔记

Canal

Canal[1],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

基于日志增量订阅和消费的业务包括

•数据库镜像

   •数据库实时备份     •索引构建和实时维护(拆分异构索引、倒排索引等)     •业务 cache 刷新     •带业务逻辑的增量数据处理

windows10 WSL 实战笔记

准备工作

确保已经开启 binlog

binlog 开启[2]

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');+--------------------------+-------+| Variable_name | Value |+--------------------------+-------+| binlog_format | ROW || binlog_row_image | FULL || enforce_gtid_consistency | ON || gtid_mode | ON || log_bin | ON |+--------------------------+-------+

创建 canal 账户

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' WITH GRANT OPTION;flush privileges;

创建表

create database test_source;use test_source;

drop table if exists user_info;create table user_info( id int unsigned auto_increment comment '主键' primary key, username varchar(128) not null comment '用户名', create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间', update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间') comment '用户信息表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;

初始化数据

insert into user_info (username) values ('u1');insert into user_info (username) values ('u2');insert into user_info (username) values ('u3');insert into user_info (username) values ('u4');insert into user_info (username) values ('u5');

下载 canal

$ pwd/home/houbinbin/canal

下载 canal, 访问 release 页面 , 选择需要的包下载[3], 如以canal.deployer-1.1.7.tar.gz为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

解压缩

tar -zxvf canal.deployer-1.1.7.tar.gz

如下:

$ lsbin canal.deployer-1.1.7.tar.gz canal.deployer-1.1.7.tar.gz:Zone.Identifier conf lib logs plugin

配置修改

修改对应的配置文件

vi conf/example/instance.properties

## mysql serverIdcanal.instance.mysql.slaveId = 1234#position info,需要改成自己的数据库信息canal.instance.master.address = 127.0.0.1:13306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name =#canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息canal.instance.dbUsername = canal canal.instance.dbPassword = canalcanal.instance.defaultDatabaseName =canal.instance.connectionCharset = UTF-8#table regexcanal.instance.filter.regex = .\*\\\\..\*

启动

sh bin/startup.sh

查看 server 日志

cat logs/canal/canal.log

日志如下:

2024-01-27 19:46:19.266 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations2024-01-27 19:46:19.280 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.2024-01-27 19:46:19.333 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.23.234.67(172.23.234.67):11111]2024-01-27 19:46:21.108 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

查看 instance 的日志

cat logs/example/example.log

日志如下:

2024-01-27 19:46:20.101 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example2024-01-27 19:46:21.056 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$2024-01-27 19:46:21.057 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$2024-01-27 19:46:21.063 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....2024-01-27 19:46:21.241 [destination = example , address = /127.0.0.1:13306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position2024-01-27 19:46:21.241 [destination = example , address = /127.0.0.1:13306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status2024-01-27 19:46:22.484 [destination = example , address = /127.0.0.1:13306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=223344,gtid=<null>,timestamp=1706355127000] cost : 1231ms , the next step is binlog dump

关闭

sh bin/stop.sh

client exmaple

参见:https://github.com/alibaba/canal/wiki/ClientExample

下载 canal, 访问 release 页面 , 选择需要的包下载[4], 如以canal.example-1.1.7.tar.gz为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.example-1.1.7.tar.gz

解压

cd ~/canal/client/tar -zxvf canal.example-1.1.7.tar.gz

$ lsbin canal.example-1.1.7.tar.gz canal.example-1.1.7.tar.gz:Zone.Identifier conf lib logs

启动

sh startup.sh

直接看一下日志:

cd /home/houbinbin/canal/client/logs/exampletail -fn30 entry.log

插入数据

数据库插入数据:

insert into user_info (username) values ('u7');

对应的客户端日志:

================> binlog[mysql-bin.000001:4325] , executeTime : 1706356761000(2024-01-27 19:59:21) , gtid : () , delay : 335ms BEGIN ----> Thread id: 8----------------> binlog[mysql-bin.000001:4481] , name[test_source,user_info] , eventType : INSERT , executeTime : 1706356761000(2024-01-27 19:59:21) , gtid : () , delay : 335 msid : 6 type=int unsigned update=trueusername : u7 type=varchar(128) update=truecreate_time : 2024-01-27 19:59:21 type=timestamp update=trueupdate_time : 2024-01-27 19:59:21 type=timestamp update=true---------------- END ----> transaction id: 351================> binlog[mysql-bin.000001:4533] , executeTime : 1706356761000(2024-01-27 19:59:21) , gtid : () , delay : 336ms

小结

canal 的设计理念其实已经非常先进了,对于日常的 mysql cdc 完全够用。

实际使用时,可以在 exmaple 的基础之上,做自己的业务能力增强。

不过还有一些类似的更强大的设计,比如 Debezium-01-为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台[5]

References

[1]Canal:https://github.com/alibaba/canal

[2]binlog 开启:https://houbb.github.io/2021/08/29/mysql-binlog

[3]访问 release 页面 , 选择需要的包下载:https://github.com/alibaba/canal/releases/

[4]访问 release 页面 , 选择需要的包下载:https://github.com/alibaba/canal/releases/

[5]Debezium-01-为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台:https://houbb.github.io/2019/02/13/database-sharding-cdc-debezium

  • 发表于:
  • 原文链接https://page.om.qq.com/page/ONhzqvtQfz2ofPcx8wLMcqAQ0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券