首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Canal 原理与实践

Canal 原理与实践

作者头像
Se7en258
发布2021-05-18 11:03:41
9020
发布2021-05-18 11:03:41
举报
文章被收录于专栏:Se7en的架构笔记Se7en的架构笔记

Canal 简介

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,不支持全量已有数据同步。由于采用了 binlog 机制,Mysql 中的新增、更新、删除操作,对应的 Elasticsearch都能实时新增、更新、删除。

MySQL 主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)。
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)。
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据。

Canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )。
  • canal 解析 binary log 对象(原始为 byte 流)。

MySQL 配置

开启 binlog

Canal 使用 MySQL 的 binlog 机制实现数据动态变化监测,所以需要 Mysql 提前配置 binlog。

编辑 /etc/my.cnf 的 mysqld 下添加如下配置:

server-id         = 7777
log_bin           = mysql-bin
binlog_format     = row
binlog_row_image  = full
expire_logs_days  = 10

然后,重启一下 Mysql 以使得 binlog 生效。

systemctl restart mysqld.service

检查 binlog 是否开启:

[root@mysql-5 ~]# mysqladmin variables -uroot@123456 | grep log_bin
| log_bin                                                | ON                                                                                                                                                                            

创建用户

创建用户 canal,密码 canal,并授予 MySQL slave 的权限:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

下载 Canal 压缩包

下载地址: https://github.com/alibaba/canal/releases

场景一:同步 MySQL 数据到 Kafka

Canal Server 配置

修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:

#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 匹配所有表,则每个表都会发送到各自表名的topic
# 例如数据库名为school,表名为student,则topic名字为school_student
canal.mq.dynamicTopic=.*\\..*

修改 canal 配置文件 vim /usr/local/canal/conf/canal.properties,其余内容保持默认即可:

# canal模式,有tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka

#Kafka集群地址
kafka.bootstrap.servers = 192.168.1.87:9092,192.168.1.88:9092,192.168.1.89:9092

启动 canal server:

bin/startup.sh

Kafka 消费数据

往 MySQL 数据库中插入一条数据:

mysql> insert into student values(8,'tonny',20);
Query OK, 1 row affected (0.01 sec)

Kafka 消费者可以成功消费到新插入的数据:

[root@kafka1 ~]# kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic student_school --from-beginning 
{"data":[{"id":"8","name":"tonny","age":"20"}],"database":"school","es":1617512377000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(20)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"student","ts":1617512405579,"type":"INSERT"}

场景二:同步 MySQL 数据到 Elasticsearch

Canal Server 配置

修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:

#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

修改 canal 配置文件 vim /usr/local/canal/conf/canal.properties,其余内容保持默认即可:

# canal模式,有tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp

启动 canal server:

bin/startup.sh

Canal-adapter 配置

修改启动器配置

修改conf/application.yml文件:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # Canal Server的访问地址(在前面的conf/canal.properties中默认配置了),保持默认
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  # 连接的数据库信息
  srcDataSources:
    defaultDS:
    # jdbc:mysql://<MySQL地址>:<端口>/<数据库名称>?useUnicode=true
      url: jdbc:mysql://192.168.1.14:3306/school?useUnicode=true
      username: canal  #用户名
      password: canal  #密码
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1 
      outerAdapters:
      - name: es7 #去读取的conf/es7目录下的yml文件
        hosts: 192.168.1.171:9300 #Elasticsearch地址
        properties:
          mode: transport 
          cluster.name: cr7-elastic  #Elasticsearch集群名字
编辑适配器表映射文件

编辑 conf/es7/mytest_user.yml文件,注意指定对应库表 id 为 Elasticsearch中 的 _id,否则会空指针异常:

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example #cannal的instance或者MQ的topic 
groupId: g1 #对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: mytest_user  #在Elasticsearch实例中所创建的索引的名称
  _id: _id #需要同步到Elasticsearch实例的文档的id,可自定义。本文使用_id
  sql: "select t.id as _id,t.id,t.name,t.age from student t"  #SQL语句,用来查询需要同步到Elasticsearch中的字段
  commitBatch: 3000  #提交批大小

启动 canal-adapter 启动器:

bin/startup.sh

启动成功后查看 canal adapter 日志会有如下内容:

[root@canal canal-adapter]# tail -f /usr/local/canal-adapter/logs/adapter/adapter.log 

#输出结果:
......
2021-04-04 13:32:58.625 [http-nio-8081-exec-10] INFO  com.alibaba.otter.canal.adapter.launcher.rest.CommonRest - #Destination: example sync on
2021-04-04 13:32:58.631 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2021-04-04 13:32:58.677 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
......

Elasicsearch 获取数据

先创建索引对应的 mapping,否则 canal 会无法识别索引,会报写入错误:


PUT mytest_user
{
  "mappings": {
      "properties": {
        "name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword"
            }
          }
        },
        "age": {
          "type": "integer"
        }
      }
  }
}

往 MySQL 数据库中插入一条数据:

mysql> insert into student values(2,'chengzw',18);
Query OK, 1 row affected (0.01 sec)

查看 canal-adapter 日志:

2021-04-04 13:36:12.022 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":2,"name":"chengzw","age":18}],"database":"school","destination":"example","es":1617514543000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"student","ts":1617514571559,"type":"INSERT"} 
Affected indexes: mytest_user 

查看 Elasticsearch 可以搜索到新的数据:

GET mytest_user/_search

#返回结果:
{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "mytest_user",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "name" : "chengzw",
          "age" : 18
        }
      }
    ]
  }
}

如果删除 MySQL 数据库数据,Canal 也会将对应 Elasticsearch 上的文档删除。

Canal-adapter 管理 REST 接口

# 查询所有订阅同步的canal instance
[root@canal canal]# curl http://127.0.0.1:8081/destinations
[{"destination":"example","status":"on"}]

# 数据同步关闭
[root@canal canal]# curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT
{"code":20000,"message":"实例: example 关闭同步成功"}

# 数据同步开启
[root@canal canal]# curl http://127.0.0.1:8081/syncSwitch/example/on -X PUT
{"code":20000,"message":"实例: example 开启同步成功"}

场景三:通过 Java API 获取 Canal 数据

Canal Server 配置

修改 instance 配置文件 vim conf/example/instance.properties,只需要修改以下内容,其余保存默认即可:

#MySQL数据库连接信息
# position info
canal.instance.master.address=192.168.1.14:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

引入 maven 依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

Java 代码

package com.chengzw;

import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


/**
 * Canal 客户端
 * @author 程治玮
 * @since 2021/4/4 12:31 下午
 */
public class SimpleCanalClientExample {


    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.175",
                11111), "example", "", "");
        //一次性获取数据的数量
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            //打开连接
            connector.connect();
            //订阅全部数据库和全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            //指定连续获取空数据最大的次数,达到这个次数断开连接
            int totalEmptyCount = 2000;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries()); //获取数据信息
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            //打印格式:binlog[mysql-bin.000001:5430] , name[school,student] , eventType : INSERT
            ///                  binlog文件名,偏移量,数据库名,表名,操作
            System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            //打印数据
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {  //删除数据
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {  //插入数据
                    printColumn(rowData.getAfterColumnsList());
                } else {  //更新数据
                    System.out.println("------- before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("------- after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }


     // 打印数据每个字段的更新情况
     // id : 8    update=false
     // name : tonny    update=false
     // age : 20    update=false
    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

往 MySQL 数据库中插入一条数据:

mysql> insert into student values(14,'mali',20);
Query OK, 1 row affected (0.00 sec)

查看控制台输出结果:

empty count : 1
empty count : 2
empty count : 3
================ binlog[mysql-bin.000001:12924] , name[school,student] , eventType : INSERT
id : 14    update=true
name : mali    update=true
age : 20    update=true

参考链接

  • https://github.com/alibaba/canal/wiki/ClientAdapter
  • https://github.com/alibaba/canal/wiki/Sync-ES
  • https://github.com/alibaba/canal/wiki/ClientExample
  • https://mp.weixin.qq.com/s/YKqKW0n5JTPgTd9kv9RDhQ
  • https://help.aliyun.com/document_detail/135297.html#title-3b7-i1b-4n3
  • https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Se7en的架构笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Canal 简介
  • MySQL 主备复制原理
  • Canal 工作原理
  • MySQL 配置
    • 开启 binlog
      • 创建用户
        • 下载 Canal 压缩包
        • 场景一:同步 MySQL 数据到 Kafka
          • Canal Server 配置
            • Kafka 消费数据
            • 场景二:同步 MySQL 数据到 Elasticsearch
              • Canal Server 配置
                • Canal-adapter 配置
                  • 修改启动器配置
                  • 编辑适配器表映射文件
                • Elasicsearch 获取数据
                  • Canal-adapter 管理 REST 接口
                  • 场景三:通过 Java API 获取 Canal 数据
                    • Canal Server 配置
                      • 引入 maven 依赖
                        • Java 代码
                        • 参考链接
                        相关产品与服务
                        云数据库 MySQL
                        腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档