本文将从三个方面深度剖析 EventParser 组件。
温馨提示:本篇篇幅较长,如果耐心阅读一定会有不错的收获,为了提高阅读体验,本文所有源码都是通过截图方式,大家可以重点阅读对应的文字说明,并在文末进行了总结。
首先我们先从官方文档来看 EventParser 的整体设计,其架构设计图如下所示:
上述图罗列出了 EventParser 的整体工作流程图,其关键步骤如下:
官方文档有助于理解 EventParser 组件的实现原理,但关于如何使用 EventParser 的篇幅较少,故接下来将从源码的角度来反推 EventParser 的特性以及详细的工作实现原理,以便指导我们如何更好的使用 EventParser。
从上篇文章我们即可得知,EventParser 组件是 Canal Instance 的四大核心组件之一,那本节的故事就从 CanalInstanceWithManager 的 initEventParser 方法开始。
CanalInstanceWithManager#initEventParserStep1:获取数据库的连接信息,上面的代码就是集合的基本操作,但从上面的代码可以窥探如何配置数据库相关的地址信息。配置 canal instance 中 数据库的地址,用户名密码有如下几种方式(CanalParamter):
温馨提示:这里的用户名与密码是在对应服务器用于进行 binlog 日志同步的账号信息。
关于多库场景的配置,再详细举例如下:
其对应的初始化代码如下:
CanalInstanceWithManager#initEventParser
Step2:根据配置的 MySQL 构建 EventParser 实例。这里有如下几个关键点:
接下来我们将重点查看 doInitEventParser 的实现细节。
CanalInstanceWithManager#doInitEventParserStep3:从这里可以看出 Canal 目前并不支持 Oracle 数据,只支持 MySQL 与 本地 binlog 文件(直接根据 binlog 日志文件解析)。
温馨提示:接下来将重点探讨基于 MySQL binlog 日志,并且会忽略与阿里云相关的 RDS 、tsdb 等数据库辅助支持,只关系与开源 MySQL 相关的处理逻辑。
CanalInstanceWithManager#doInitEventParser Step3:MySQL 的 binlog 事件解析器实现类为 MysqlEventParser,这里我们重点来阐述一下这些参数的含义:
CanalInstanceWithManager#doInitEventParser Step4:如果设置了 CanalPrameter 的 Listpositions 属性,则将其解析为 EntryPosition 实体,我们来看一下如何表征 binlog 日志的位点信息。
其主要的核心参数如下:
温馨提示:实践指导,CanalParameter 的 List< String> positions 不支持组模式,只能设置一组,即第一个元素为主,第二个元素可以为从节点,该属性非必填。
CanalInstanceWithManager#doInitEventParserStep5:继续设置参数,具体看一下各个参数的含义:
CanalInstanceWithManager#doInitEventParserStep6:继续填充解析器相关参数,其重点实现如下:
CanalInstanceWithManager#doInitEventParser Step7:如果解析器是 MySQL 解析器,提供了 HA 机制,即如果 MySQL Master 宕机,Canal 还能主动切换到 MYSQL Slave 节点,继续同步 binlog 日志。
上面已经详细介绍了EventParser 的初始化过程,有助于大家对 CanalInstance 相关配置参数的理解,本节将相信介绍 EventParser 的工作流程,其实现代码入口为 EventParser 的 start 方法。本文重点将探究 MySQL binlog 日志的解析,故其实现类为:MysqlEventParser。
MysqlEventParser 的 start 方法代码如下:
主要调用的是其父类的 start 方法。接下来对其进行详细解读。
AbstractEventParser#startStep1:创建环形缓存区,其主要的作用是 Canal 在解析 binlog 日志后,会尽量尝试将一个数据库事务所产生的全部变更日志(一个事务所有变更数据)当成一个整体提交给 EventSink 组件,从而 Canal 的消费方能一次将一个事务的数据全部同步,数据的完整性得到了保证。
温馨提示:关于环形缓存区的具体实现细节将在下文详细介绍,这里先简单说一下 Canal 目前无法百分之百保证一个事务的数据就一定是一次消费,如果一个事务产生的变更日志超过了环形缓存区的容量,则会被强制提交消费,一个事务的数据会被分开消费,默认环形缓存区的长度为 1024.
AbstractEventParser#startStep2:构建一个 binlog 解析器,该方法在 AbstractEventParser 中为一个抽象方法,具体的实现在其子类中,其代码截图如下:AbstractMysqlEventParser,在 MySQL binlog 解析的实现类为 LogEventConvert,所处的模块为 parse,该部分是整个 Canal EventParser 的核心,将在后续文章中单独详细介绍。
AbstractEventParser#start Step3:启动一个独立的线程来负责 binlog 的解析,其线程包含了 Canal Instance 的 destination、address 等信息,方便利用 jstack 去诊断 binlog 解析相关问题。接下来就是解读该线程的 run 方法,从而探究 binlog 的解析流程。
AbstractEventParser#startStep31:首先创建一条到需要解析 binlog 日志的服务器,例如需要同步 192.168.1.166:3306 这个数据库实例的 binlog 日志,那 Canal 首先会使用拥有该库复制权限的账号去创建一条TCP连接,本文并不会详细去介绍这里的实现细节,这里代表一个领域,即需要知晓 MySQL 通讯协议,通过TCP与MySQL建立连接,并按照 MySQL 通讯协议发送命令,例如 select、dump 等请求,这个后续在学完 Canal 等核心组件后,可能会深入学习该部分的内容,这里我重点点出其实现的几个关键要点:
AbstractEventParser#startStep32:发送心跳包,这里的关键实现点如下:
AbstractEventParser#startStep33:执行发送 dump 命令正式从 MySQL 服务器接收 binlog 日志之前的准备工作,具体准备工作如下:
扩展阅读:binlog_format 我相信大家都不陌生,对 binlog_row_image 见过的估计比较少,那 binlog_row_image 有何作用呢?
binlog_row_image 主要是在 binlog_format 为 ROW 模式下,控制记录 binlog 事件的方式,binlog 的作用是记录数据的变化,例如 update 请求,需要记录一行记录变化之前的数据以及变化后的数据,在 binlog event 分别用 before 、after 记录变化前后的数据,但有一个问题,是只发生变化的字段的前后值呢,还是记录一行中所有字段修改前后的值呢?故引入了 binlog_row_image,该值支持如下选项:
AbstractEventParser#startStep34:向 MySQL 服务端发送 show variables like 'server_id' 语句,查询服务端配置的 serverId。
AbstractEventParser#startStep35:通过日志位点管理器获取需要同步的位点,后续会详细展开。
AbstractEventParser#startStep36:通过向 MySQL 发送 dump 请求,从服务器接收 binlog 日志,并进行处理,为了提高性能,Canal 支持该过程进行并行化处理,通过 parallel 属性设置是否支持并发,从而引入 disruptor 高性能并发框架,详情后在后续文章中详细解读。
AbstractEventParser#startStep37:通过接收到 MySQL 服务端返回的日志并解析为 Canal.Entry 对象,并传输到 EventSink 组件。
上述过程反复执行,持续完成 binlog 日志的解析,实现数据的同步。
本文首先结合官方文档了解了 EventParser,但 Canal 的官方手册并不特别详细,故需要我们通过源码去反推 canal instance 中关于 EventParser 有哪些参数,并且这些参数有何意义,是如何工作的。
众所周知,EventParser 的主要职责就是与 MySQL 服务器“打交道”,将自己伪装成 MySQL 服务器的一个从节点,从服务器端接收 binlog 日志,并将二进制流解码成 Canal.Entry,看似简单,但实现起来还是比较困难的,下面这些方面是后续值得我们研究探讨的点: