Canal入门

背景

以下内容来自canal官网介绍

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基于日志增量订阅&消费支持的业务:

数据库镜像

数据库实时备份

多级索引 (卖家和买家各自分库索引)

search build

业务cache刷新

价格变化等重要业务消息

项目介绍

名称:canal [kə’næl]

译意:水道/管道/沟渠

语言:纯java开发

定位:基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql

关键词:mysql binlog parser / real-time / queue&topic

Canal的工作原理

mysql主备复制实现

从上层来看,复制分成三步:

  • master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  • slave将master的binary log events拷贝到它的中继日志(relay log);
  • slave重做中继日志中的事件,将改变反映它自己的数据。

canal的工作原理

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

编写客户端程序

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.12</version>
</dependency>
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");①
    int batchSize = 1000;②
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*"); ③
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

}

说明:

① 指定ip和端口号

② 指定一次抓取的条数

③ 指定要监控的表,这里会覆盖配置文件

启动之后会看到如下信息

empty count : 1

empty count : 2

empty count : 3

empty count : 4

然后试着修改数据库,再观察控制台打印是否变化。

源码分析

canal服务端主要是获取binlog信息,canal客户端是负责把获取到的信息推送到不同的下游。我们将从了解canal的开始一步一步解析它。

需要注意,这里只分析canal客户端源码,并非canal-kafka源码。canal-kafka是将kafka作为客户端嵌入到canal里的,并且是直接将信息转成ByteString发送到kafka。

Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

从示例中的程序我们可以发现,客户端getWithoutAck获取消息是批量拉取的方式,每次决定了要拉取多少条。这种形式类似kafka的consumer,这样整个客户端的处理数据能力就直接决定了canal的消费速度。

这里还需要说明getWithoutAck返回的是一个Message对象。这个Message是一个包含了一次请求返回多条数据的一个集合。

package com.alibaba.otter.canal.protocol;
...
public class Message implements Serializable {

    private static final long serialVersionUID = 1234034768477580009L;

    private long id;
    private List<CanalEntry.Entry> entries = new ArrayList<CanalEntry.Entry>();

我们可以看到每个Message有一个id,entries中包含的就是我们请求返回的数据,类型是CanalEntry.Entry。 这里需要说明一下,在canal-kafka源码中Message类和canal中的Message是不一样的,主要是增加了两个属性。

package com.alibaba.otter.canal.protocol;
...
public class Message implements Serializable{

    private static final long serialVersionUID = 1234034768477580009L;

    private long id;
    @Deprecated
    private List<CanalEntry.Entry> entries = new ArrayList<CanalEntry.Entry>();
    // row data for performance, see:
    // https://github.com/alibaba/canal/issues/726
    private boolean raw = true;
    private List<ByteString> rawEntries = new ArrayList<ByteString>();

在canal-kafka中,Message最后是转换成ByteString发送到kafka的,这里是针对kafka场景使用。

package com.alibaba.otter.canal.kafka.producer;
...
  for (Topic topic : destination.getTopics()) {
         canalKafkaProducer.send(topic, message); // 发送message到所有topic
    }

Entry中包含

Header  
    logfileName [binlog文件名]  
    logfileOffset [binlog position]  
    executeTime [binlog里记录变更发生的时间戳,精确到秒]  
    schemaName   
    tableName  
    eventType [insert/update/delete类型]  
entryType   [事务头BEGIN/事务尾END/数据ROWDATA]  
storeValue  [byte数据,可展开,对应的类型为RowChange]
 System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

对于数据内容,字段信息需要解析成rowChage获取。

rowChage = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChage.getEventType();
rowChage.getBeforeColumnsList();
rowChage.getAfterColumnsList();
RowChange
   isDdl         [是否是ddl变更操作,比如create table/drop table]
   sql           [具体的ddl sql]
   rowDatas      [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
   beforeColumns [Column类型的数组,变更前的数据字段]
   afterColumns  [Column类型的数组,变更后的数据字段]

columns中的需要循环rowChage.getAfterColumnsList()获取

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

欢迎点赞+收藏+转发朋友圈素质三连

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏bigsai

从100到1000万高并发的架构演进之路

本文以设计淘宝网的后台架构为例,介绍从一百个并发到千万级并发情况下服务端的架构的14次演进过程,同时列举出每个演进阶段会遇到的相关技术,让大家对架构的演进有一...

22040
来自专栏TeamsSix的网络空间安全专栏

手工SQL注入语句构造

一提到SQL语句就想到了期末数据库考试的时候,那俩监考老师,哎,不说了,还好咱们数据库老师大发慈悲

16140
来自专栏架构专题

由 Canal 组件分析集成中间件架构的一般过程

为什么要做数据同步?因为数据很多,还要共享或做它用。举个栗子,你从移动硬盘拷贝一份小小电影到你的 Macbook 上赏析,也叫 数据同步。但系统不比你的单纯,它...

14640
来自专栏架构专题

这么多监控组件,总有一款适合你

监控是分布式系统的必备组件,能够起到提前预警、问题排查、评估决策等功效,乃行走江湖、居家必备之良品。

68430
来自专栏生信技能树

就想把表达矩阵区分成为蛋白编码基因和非编码有这么难吗?

考核题的文章里面是自己测了8个TNBC病人的转录组然后分析,这里借助TCGA数据库,所以可以复现。我这里想展现的主要是TCGA的数据下载和基因的ID转换,分类,...

14930
来自专栏信息化漫谈

客户将数据库迁移上云的常用办法

最近成功中标一个国内重大酒业集团的公有云项目,因客户自身的IT人员紧张,客户提出要求将应用、数据库的迁移上云作为中标方的服务内容之一。以前,经常...

11930
来自专栏MYSQL轻松学

MGR 的主要优点

MGR(Mysql Group Replication)是5.7版本新加的特性,是一个MySQL插件。

25650
来自专栏bigsai

xml是啥?是干啥用的?

XML,Extensible Markup Language,扩展性标识语言。文件的后缀名为:.xml。就像HTML的作用是显示数据,XML的作用是传输和存储数...

9620
来自专栏架构专题

“分库分表" ?选型和流程要慎重,否则会失控

恭喜你,贵公司终于成长到一定规模,需要考虑高可用,甚至分库分表了。但你是否知道分库分表需要哪些要素?拆分过程是复杂的,提前计划,不要等真正开工,各种意外的工作接...

16140
来自专栏信息化漫谈

实测用mysqldump备份mysql数据

最近为测试几种数据备份的工具,对mysql的备份方式、备份工具进行了详细测试,今天测试mysql原生工具mysqldump。

11120

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励