专栏首页大数据成神之路使用canal-kafka实现数据库增量实时更新

使用canal-kafka实现数据库增量实时更新

安装canal

下载安装包: https://github.com/alibaba/canal/releases canal.kafka-1.1.0.tar.gz

解压到固定目录:

tar -zxvf canal.kafka-1.1.0.tar.gz

修改配置

vi conf/example/instance.properties
#数据库地址
canal.instance.master.address=192.168.56.104:3306

#数据库用户密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal

#数据库字符集
canal.instance.connectionCharset=UTF-8

#默认数据库
canal.instance.defaultDatabaseName =zgai_db

vim /usr/local/canal/conf/canal.properties

#canalid
canal.id= 1

#canal地址
canal.ip=192.168.56.102

#zookeeper地址
canal.zkServers=192.168.56.102:2181

vim /usr/local/canal/conf/kafka.yml

#kafka地址
servers: 192.168.56.101:9092

# canal的批次大小,单位 k
canalBatchSize: 50

#topic
    topic: mytopic

配置详细介绍

canal.properties

canal配置主要分为两部分定义:

1. instance列表定义 (列出当前server上有多少个instance,每个instance的加载方式是spring/manager等)

参数名字

参数说明

默认值

canal.destinations

当前server上部署的instance列表

canal.conf.dir

conf/目录所在的路径

../conf

canal.auto.scan

开启instance自动扫描如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:a. instance目录新增:触发instance配置载入,lazy为true时则自动启动b. instance目录删除:卸载对应instance配置,如已启动则进行关闭c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作

true

canal.auto.scan.interval

instance自动扫描的间隔时间,单位秒

5

canal.instance.global.mode

全局配置加载方式

spring

canal.instance.global.lazy

全局lazy模式

false

canal.instance.global.manager.address

全局的manager配置方式的链接信息

canal.instance.global.spring.xml

全局的spring配置方式的组件文件

classpath:spring/file-instance.xml(spring目录相对于canal.conf.dir)

canal.instance.example.modecanal.instance.example.lazycanal.instance.example.spring.xml…..

instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式命名规则:canal.instance.{name}.xxx

canal.instance.tsdb.spring.xml

v1.0.25版本新增,全局的tsdb配置方式的组件文件

classpath:spring/tsdb/h2-tsdb.xml(spring目录相对于canal.conf.dir)

2. common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享.

【instance.properties配置定义优先级高于canal.properties】

参数名字

参数说明

默认值

canal.id

每个canal server实例的唯一标识,暂无实际意义

1

canal.ip

canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务

canal.port

canal server提供socket服务的端口

11111

canal.zkServers

canal server链接zookeeper集群的链接信息例子:127.0.0.1:2181,127.0.0.1:2182

canal.zookeeper.flush.period

canal持久化数据到zookeeper上的更新频率,单位毫秒

1000

canal.file.data.dir

canal持久化数据到file上的目录

../conf (默认和instance.properties为同一目录,方便运维和备份)

canal.file.flush.period

canal持久化数据到file上的更新频率,单位毫秒

1000

canal.instance.memory.batch.mode

canal内存store中数据缓存模式1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小

MEMSIZE

canal.instance.memory.buffer.size

canal内存store中可缓存buffer记录数,需要为2的指数

16384

canal.instance.memory.buffer.memunit

内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小

1024

canal.instance.transactionn.size

最大事务完整解析的长度支持超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性

1024

canal.instance.fallbackIntervalInSeconds

canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢

60

canal.instance.detecting.enable

是否开启心跳检查

false

canal.instance.detecting.sql

心跳检查sql

insert into retl.xdual values(1,now()) on duplicate key update x=now()

canal.instance.detecting.interval.time

心跳检查频率,单位秒

3

canal.instance.detecting.retry.threshold

心跳检查失败重试次数

3

canal.instance.detecting.heartbeatHaEnable

心跳检查失败后,是否开启自动mysql自动切换说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据

false

canal.instance.network.receiveBufferSize

网络链接参数,SocketOptions.SO_RCVBUF

16384

canal.instance.network.sendBufferSize

网络链接参数,SocketOptions.SO_SNDBUF

16384

canal.instance.network.soTimeout

网络链接参数,SocketOptions.SO_TIMEOUT

30

canal.instance.filter.query.dcl

是否忽略DCL的query语句,比如grant/create user等

false

canal.instance.filter.query.dml

是否忽略DML的query语句,比如insert/update/delete table.(mysql5.6的ROW模式可以包含statement模式的query记录)

false

canal.instance.filter.query.ddl

是否忽略DDL的query语句,比如create table/alater table/drop table/rename table/create index/drop index. (目前支持的ddl类型主要为table级别的操作,create databases/trigger/procedure暂时划分为dcl类型)

false

canal.instance.filter.druid.ddl

v1.0.25版本新增,是否启用druid的DDL parse的过滤,基于sql的完整parser可以解决之前基于正则匹配补全的问题,默认为true

true

canal.instance.get.ddl.isolation

ddl语句是否隔离发送,开启隔离可保证每次只返回发送一条ddl数据,不和其他dml语句混合返回.(otter ddl同步使用)

false

instance.properties介绍:

a. 在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件

比如:

canal.destinations = example1,example2

这时需要创建example1和example2两个目录,每个目录里各自有一份instance.properties.

ps. canal自带了一份instance.properties demo,可直接复制conf/example目录进行配置修改

cp -R example example1/
cp -R example example2/

b. 如果canal.properties未定义instance列表,但开启了canal.auto.scan时

  • server第一次启动时,会自动扫描conf目录下,将文件名做为instance name,启动对应的instance
  • server运行过程中,会根据canal.auto.scan.interval定义的频率,进行扫描 1. 发现目录有新增,启动新的instance 2. 发现目录有删除,关闭老的instance 3. 发现对应目录的instance.properties有变化,重启instance

instance.properties参数列表:

参数名字

参数说明

默认值

canal.instance.mysql.slaveId

mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一

1234

canal.instance.master.address

mysql主库链接地址

127.0.0.1:3306

canal.instance.master.journal.name

mysql主库链接时起始的binlog文件

canal.instance.master.position

mysql主库链接时起始的binlog偏移量

canal.instance.master.timestamp

mysql主库链接时起始的binlog的时间戳

canal.instance.dbUsername

mysql数据库帐号

canal

canal.instance.dbPassword

mysql数据库密码

canal

canal.instance.defaultDatabaseName

mysql链接时默认schema

canal.instance.connectionCharset

mysql 数据解析编码

UTF-8

canal.instance.filter.regex

mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)常见例子:1. 所有表:.* or .*\\..*2. canal schema下所有表: canal\\..*3. canal下的以canal打头的表:canal\\.canal.*4. canal schema下的一张表:canal.test15. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

.*\\..*

canal.instance.tsdb.enable

v1.0.25版本新增,是否开启table meta的时间序列版本记录功能

true

canal.instance.tsdb.dir

v1.0.25版本新增,table meta的时间序列版本的本地存储路径,默认为instance目录

canal.file.data.dir:../conf/canal.file.data.dir:../conf/{canal.instance.destination:}

canal.instance.tsdb.url

v1.0.25版本新增,table meta的时间序列版本存储的数据库链接串,比如例子为本地嵌入式数据库

jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;

canal.instance.tsdb.dbUsername

v1.0.25版本新增,table meta的时间序列版本存储的数据库链接账号

canal

canal.instance.tsdb.dbUsername

v1.0.25版本新增,table meta的时间序列版本存储的数据库链接密码

canal

几点说明:

1. mysql链接时的起始位置

  • canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
  • canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
  • 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)

2. mysql解析关注表定义

  • 标准的Perl正则,注意转义时需要双斜杠:\\

3. mysql链接的编码

  • 目前canal版本仅支持一个数据库只有一种编码,如果一个库存在多个编码,需要通过filter.regex配置,将其拆分为多个canal instance,为每个instance指定不同的编码

instance.xml配置文件

目前默认支持的instance.xml有以下几种:

  • spring/memory-instance.xml
  • spring/file-instance.xml
  • spring/default-instance.xml
  • spring/group-instance.xml

在介绍instance配置之前,先了解一下canal如何维护一份增量订阅&消费的关系信息:

  • 解析位点 (parse模块会记录,上一次解析binlog到了什么位置,对应组件为:CanalLogPositionManager)
  • 消费位点 (canal server在接收了客户端的ack后,就会记录客户端提交的最后位点,对应的组件为:CanalMetaManager)

对应的两个位点组件,目前都有几种实现:

  • memory (memory-instance.xml中使用)
  • zookeeper
  • mixed
  • file (file-instance.xml中使用,集合了file+memory模式,先写内存,定时刷新数据到本地file上)
  • period (default-instance.xml中使用,集合了zookeeper+memory模式,先写内存,定时刷新数据到zookeeper上)

memory-instance.xml介绍:

所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析

特点:速度最快,依赖最少(不需要zookeeper)

场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境

file-instance.xml介绍:

所有的组件(parser , sink , store)都选择了基于file持久化模式,注意,不支持HA机制.

特点:支持单机持久化

场景:生产环境,无HA需求,简单可用.

default-instance.xml介绍:

所有的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.

特点:支持HA

场景:生产环境,集群化部署.

group-instance.xml介绍:

主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。

场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.

instance.xml设计初衷:

允许进行自定义扩展,比如实现了基于数据库的位点管理后,可以自定义一份自己的instance.xml,整个canal设计中最大的灵活性在于此

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spring 框架基础(02):Bean的生命周期,作用域,装配总结

    上面使用 ComponentScan 注解,也可在配置文件进行统一的配置,效果相同,还简化代码。

    知了一笑
  • 记一次某XX远程接访系统漏洞后台getshell

    我这里做的第一步是做敏感目录扫描(自己的特有字典),跑出一处某程序控制台登入界面(尝试弱口令进入)

    天钧
  • Hystrix Dashboard:断路器执行监控

    Hystrix提供了Hystrix Dashboard来实时监控HystrixCommand方法的执行情况。Hystrix Dashboard可以有效地反映出每...

    macrozheng
  • Javaweb设置session过期时间

    在Java Web开发中,Session为我们提供了很多方便,Session是由浏览器和服务器之间维护的。Session超时理解为:浏览器和服务器之间创建了一个...

    walking在cloud.tencent
  • xml是啥?是干啥用的?

    XML,Extensible Markup Language,扩展性标识语言。文件的后缀名为:.xml。就像HTML的作用是显示数据,XML的作用是传输和存储数...

    用户4372098
  • Tomcat8访问管理页面localhost出现:403 Access Denied

    仔细一看,已经提示我们的context.xml中没有配置权限,此时我们只需要用最高效的解决办法就是

    Arebirth
  • 深入浅出-XXE漏洞

    写这篇的主要目的是因为很多CTFer还有一些安全人员不是很清楚xxe漏洞,还有在面试当中,xxe漏洞也经常被问到,所以就写这么一篇文章来学习xxe漏洞. 本篇会...

    徐焱
  • 几行代码实现小程序云开发提现功能

    纯云开发实现,下面说使用步骤: 一:开通商户的企业付款到领取功能 说明地址:

    许坏
  • WebGoat靶场系列---AJAX Security(Ajax安全性)

    Ajax 即” Asynchronous Javascript And XML”(异步 JavaScript 和 XML),是指一种创建交互式网页应用的网页开发...

    徐焱
  • WebGoat靶场系列---Access Control Flaws(访问控制缺陷)

    访问控制缺陷,未对通过身份验证的用户实施恰当的访问控制。攻击者可以利用这些缺陷访问未经授权的功能或数据,访问控制缺陷有基于角色的,也有基于路径的,例如:访问其...

    徐焱

扫码关注云+社区

领取腾讯云代金券