这是学习笔记的第 2142 篇文章
在Binlog解析方向和数据流转方向上,经常会提到比较有名的几类工具,阿里的Canal,Zendesk的Maxwell和Yelp的mysql_streamer,我们来简单说一下Maxwell。
在功能完善性和生态建设上,Canal和Zendesk整体的表现要好一些,它们都是基于Java开发,支持多种模式的数据上下游集成,如果是想快速上手,Maxwell是一个不错的选择,而mysql_streamer的维护时间在2017年左右,在行业里看到的案例相对要少。
Maxwell相对比较精巧,它能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,这一点是我优先考虑Maxwell的首要原因,当然它也可以作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。如果说使用场景,它的常见应用场景有ETL、维护缓存、收集表级别的DML指标、增量到搜索引擎、数据分区迁移等。
我们可以快速通过一个部署测试的过程来快速熟悉Maxwell,整个测试基于云主机环境。
首先要安装JDK和maven
可以快速验证:
# java -version openjdk version "1.8.0_232" # mvn -v Apache Maven 3.0.5 (Red Hat 3.0.5-17)
其中对于maven源,可以考虑云服务的配置,如果是默认的maven源在后续编译安装的时候会有些麻烦。
在此演示的是通过获得Maxwell的源码编译得到安装文件来进行说明的,因为GitHub的下载文件较大,下载网速较慢,我们使用Maxwell的源码文件编译安装,这个项目代码量相对比较小,所以比较便捷,也方便后续做一些改动和调试。
下载源码包的链接是:,
wget https://github.com/zendesk/maxwell/archive/v1.23.2.tar.gz
解压进入安装目录后,执行make package命令,即可开始整个工具的编译安装。
maven源如果使用默认的,整个编译过程会比较长。
[INFO] BUILD SUCCESS[INFO] Total time: 2:21.275s[INFO] Finished at: Thu Oct 24 18:38:30 CST 2019[INFO] Final Memory: 48M/182M
而如果使用云主机服务范围内的maven源,速度就快了好多。
[INFO] Total time: 48.574s[INFO] Finished at: Thu Oct 24 19:08:50 CST 2019[INFO] Final Memory: 49M/186M
编译完成之后在母target下面就有编译生成的
我们来做下数据库的初始化, 可以参考如下的文章快速完成数据库环境的部署。
然后创建数据库相关用户和权限配置
主要有复制相关的权限,在解析的过程中,Maxwell会把自己包装成一个Slave,然后进行数据通信,当然这个过程
CREATE USER 'maxwell'@'%' identified by 'XXXXXX';GRANT ALL on maxwell.* to 'maxwell'@'%' ;GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';FLUSH PRIVILEGES;
为了方便测试和降低复杂度,我们先不做Kafka的数据流转测试,而是基于标准JSON输出做一些简单的分析。
接下来就可以直接调用Maxwell来做一些测试了,在此我没有使用配置文件,而是使用了大部分默认选项,使用标准输出模式。
bin/maxwell --user='maxwell' --password='XXXXXX' --host=127.0.0.1 --port=33071 --producer=stdout
创建表输出的JSON信息如下,它是完全按照SQL类型定义的不同的JSON结构。
18:45:35,946 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[binlog.000006:32499], lastHeartbeat=1571913934970] after applying "create table test_data(id int,name varchar(30))" to test, new schema id is 2{"database":"test","table":"test_data","type":"insert","ts":1571913951,"xid":811,"commit":true,"data":{"id":1,"name":"aa"}}
我们测试下手工开启事务后,是否在解析中能够正常解析出事务的信息。
mysql> begin;Query OK, 0 rows affected (0.00 sec)
mysql> insert into test_data values(2,'bb');Query OK, 1 row affected (0.00 sec)
mysql> insert into test_data values(3,'cc'); Query OK, 1 row affected (0.00 sec)
mysql> commit;Query OK, 0 rows affected (0.03 sec)
解析的结果如下,可以看到xid的基本信息。
{"database":"test","table":"test_data","type":"insert","ts":1571914005,"xid":835,"xoffset":0,"data":{"id":2,"name":"bb"}}{"database":"test","table":"test_data","type":"insert","ts":1571914009,"xid":835,"commit":true,"data":{"id":3,"name":"cc"}}
删除数据,如果影响行数是多行,会基于行模式解析
mysql> delete from test_data where id>1;Query OK, 2 rows affected (0.01 sec)
解析日志如下:
{"database":"test","table":"test_data","type":"delete","ts":1571914621,"xid":1021,"xoffset":0,"data":{"id":2,"name":"bb"}}{"database":"test","table":"test_data","type":"delete","ts":1571914621,"xid":1021,"commit":true,"data":{"id":3,"name":"cc"}}
update数据,会显示变更前和变更后的数据情况。
mysql> update test_data set name='aaa' where id=1;Query OK, 1 row affected (0.01 sec)Rows matched: 1 Changed: 1 Warnings: 0
显示日志如下:
{"database":"test","table":"test_data","type":"update","ts":1571914754,"xid":1067,"commit":true,"data":{"id":1,"name":"aaa"},"old":{"name":"aa"}}
在Maxwell的实现中,有很多定制化的配置,比如默认会创建一个maxwell命名的数据库,当然也可以指定多种选型进行配置管理,然后在这个数据库下面配置一些表,这方面的内容我们随后的文章会展开来进行分析。
mysql> show tables;+-------------------+| Tables_in_maxwell |+-------------------+| bootstrap || columns || databases || heartbeats || positions || schemas || tables |+-------------------+7 rows in set (0.00 sec)
从昨天下班前快速测试Maxwell的方案,从部署到基本验证,整个过程不超过30分钟,整体来说已经比较快捷了。
近期热文: