canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,不支持全量已有数据同步。由于采用了 binlog 机制,Mysql 中的新增、更新、删除操作,对应的 Elasticsearch都能实时新增、更新、删除。
Canal 使用 MySQL 的 binlog 机制实现数据动态变化监测,所以需要 Mysql 提前配置 binlog。
编辑 /etc/my.cnf 的 mysqld 下添加如下配置:
server-id = 7777
log_bin = mysql-bin
binlog_format = row
binlog_row_image = full
expire_logs_days = 10
然后,重启一下 Mysql 以使得 binlog 生效。
systemctl restart mysqld.service
检查 binlog 是否开启:
[root@mysql-5 ~]# mysqladmin variables -uroot@123456 | grep log_bin
| log_bin | ON
创建用户 canal,密码 canal,并授予 MySQL slave 的权限:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
下载地址: https://github.com/alibaba/canal/releases
修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:
#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 匹配所有表,则每个表都会发送到各自表名的topic
# 例如数据库名为school,表名为student,则topic名字为school_student
canal.mq.dynamicTopic=.*\\..*
修改 canal 配置文件 vim /usr/local/canal/conf/canal.properties,其余内容保持默认即可:
# canal模式,有tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
#Kafka集群地址
kafka.bootstrap.servers = 192.168.1.87:9092,192.168.1.88:9092,192.168.1.89:9092
启动 canal server:
bin/startup.sh
往 MySQL 数据库中插入一条数据:
mysql> insert into student values(8,'tonny',20);
Query OK, 1 row affected (0.01 sec)
Kafka 消费者可以成功消费到新插入的数据:
[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic student_school --from-beginning
{"data":[{"id":"8","name":"tonny","age":"20"}],"database":"school","es":1617512377000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(20)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"student","ts":1617512405579,"type":"INSERT"}
修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:
#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
修改 canal 配置文件 vim /usr/local/canal/conf/canal.properties,其余内容保持默认即可:
# canal模式,有tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp
启动 canal server:
bin/startup.sh
修改conf/application.yml文件:
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# Canal Server的访问地址(在前面的conf/canal.properties中默认配置了),保持默认
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# 连接的数据库信息
srcDataSources:
defaultDS:
# jdbc:mysql://<MySQL地址>:<端口>/<数据库名称>?useUnicode=true
url: jdbc:mysql://192.168.1.14:3306/school?useUnicode=true
username: canal #用户名
password: canal #密码
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: es7 #去读取的conf/es7目录下的yml文件
hosts: 192.168.1.171:9300 #Elasticsearch地址
properties:
mode: transport
cluster.name: cr7-elastic #Elasticsearch集群名字
编辑 conf/es7/mytest_user.yml文件,注意指定对应库表 id 为 Elasticsearch中 的 _id,否则会空指针异常:
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example #cannal的instance或者MQ的topic
groupId: g1 #对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: mytest_user #在Elasticsearch实例中所创建的索引的名称
_id: _id #需要同步到Elasticsearch实例的文档的id,可自定义。本文使用_id
sql: "select t.id as _id,t.id,t.name,t.age from student t" #SQL语句,用来查询需要同步到Elasticsearch中的字段
commitBatch: 3000 #提交批大小
启动 canal-adapter 启动器:
bin/startup.sh
启动成功后查看 canal adapter 日志会有如下内容:
[root@canal canal-adapter]# tail -f /usr/local/canal-adapter/logs/adapter/adapter.log
#输出结果:
......
2021-04-04 13:32:58.625 [http-nio-8081-exec-10] INFO com.alibaba.otter.canal.adapter.launcher.rest.CommonRest - #Destination: example sync on
2021-04-04 13:32:58.631 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2021-04-04 13:32:58.677 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
......
先创建索引对应的 mapping,否则 canal 会无法识别索引,会报写入错误:
PUT mytest_user
{
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"age": {
"type": "integer"
}
}
}
}
往 MySQL 数据库中插入一条数据:
mysql> insert into student values(2,'chengzw',18);
Query OK, 1 row affected (0.01 sec)
查看 canal-adapter 日志:
2021-04-04 13:36:12.022 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":2,"name":"chengzw","age":18}],"database":"school","destination":"example","es":1617514543000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"student","ts":1617514571559,"type":"INSERT"}
Affected indexes: mytest_user
查看 Elasticsearch 可以搜索到新的数据:
GET mytest_user/_search
#返回结果:
{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "mytest_user",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"id" : 2,
"name" : "chengzw",
"age" : 18
}
}
]
}
}
如果删除 MySQL 数据库数据,Canal 也会将对应 Elasticsearch 上的文档删除。
# 查询所有订阅同步的canal instance
[root@canal canal]# curl http://127.0.0.1:8081/destinations
[{"destination":"example","status":"on"}]
# 数据同步关闭
[root@canal canal]# curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT
{"code":20000,"message":"实例: example 关闭同步成功"}
# 数据同步开启
[root@canal canal]# curl http://127.0.0.1:8081/syncSwitch/example/on -X PUT
{"code":20000,"message":"实例: example 开启同步成功"}
修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:
#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
package com.chengzw;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
/**
* Canal 客户端
* @author 程治玮
* @since 2021/4/4 12:31 下午
*/
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.175",
11111), "example", "", "");
//一次性获取数据的数量
int batchSize = 1000;
int emptyCount = 0;
try {
//打开连接
connector.connect();
//订阅全部数据库和全部表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
//指定连续获取空数据最大的次数,达到这个次数断开连接
int totalEmptyCount = 2000;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries()); //获取数据信息
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
//打印格式:binlog[mysql-bin.000001:5430] , name[school,student] , eventType : INSERT
/// binlog文件名,偏移量,数据库名,表名,操作
System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//打印数据
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) { //删除数据
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) { //插入数据
printColumn(rowData.getAfterColumnsList());
} else { //更新数据
System.out.println("------- before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("------- after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
// 打印数据每个字段的更新情况
// id : 8 update=false
// name : tonny update=false
// age : 20 update=false
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
往 MySQL 数据库中插入一条数据:
mysql> insert into student values(14,'mali',20);
Query OK, 1 row affected (0.00 sec)
查看控制台输出结果:
empty count : 1
empty count : 2
empty count : 3
================ binlog[mysql-bin.000001:12924] , name[school,student] , eventType : INSERT
id : 14 update=true
name : mali update=true
age : 20 update=true