前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Canal入门

Canal入门

作者头像
王知无-import_bigdata
发布2019-10-15 17:27:54
1.1K0
发布2019-10-15 17:27:54
举报

背景

以下内容来自canal官网介绍

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基于日志增量订阅&消费支持的业务:

数据库镜像

数据库实时备份

多级索引 (卖家和买家各自分库索引)

search build

业务cache刷新

价格变化等重要业务消息

项目介绍

名称:canal [kə’næl]

译意:水道/管道/沟渠

语言:纯java开发

定位:基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql

关键词:mysql binlog parser / real-time / queue&topic

Canal的工作原理

mysql主备复制实现

从上层来看,复制分成三步:

  • master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  • slave将master的binary log events拷贝到它的中继日志(relay log);
  • slave重做中继日志中的事件,将改变反映它自己的数据。

canal的工作原理

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

编写客户端程序

代码语言:javascript
复制
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.12</version>
</dependency>
代码语言:javascript
复制
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");①
    int batchSize = 1000;②
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*"); ③
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

}

说明:

① 指定ip和端口号

② 指定一次抓取的条数

③ 指定要监控的表,这里会覆盖配置文件

启动之后会看到如下信息

empty count : 1

empty count : 2

empty count : 3

empty count : 4

然后试着修改数据库,再观察控制台打印是否变化。

源码分析

canal服务端主要是获取binlog信息,canal客户端是负责把获取到的信息推送到不同的下游。我们将从了解canal的开始一步一步解析它。

需要注意,这里只分析canal客户端源码,并非canal-kafka源码。canal-kafka是将kafka作为客户端嵌入到canal里的,并且是直接将信息转成ByteString发送到kafka。

代码语言:javascript
复制
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

从示例中的程序我们可以发现,客户端getWithoutAck获取消息是批量拉取的方式,每次决定了要拉取多少条。这种形式类似kafka的consumer,这样整个客户端的处理数据能力就直接决定了canal的消费速度。

这里还需要说明getWithoutAck返回的是一个Message对象。这个Message是一个包含了一次请求返回多条数据的一个集合。

代码语言:javascript
复制
package com.alibaba.otter.canal.protocol;
...
public class Message implements Serializable {

    private static final long serialVersionUID = 1234034768477580009L;

    private long id;
    private List<CanalEntry.Entry> entries = new ArrayList<CanalEntry.Entry>();

我们可以看到每个Message有一个id,entries中包含的就是我们请求返回的数据,类型是CanalEntry.Entry。 这里需要说明一下,在canal-kafka源码中Message类和canal中的Message是不一样的,主要是增加了两个属性。

代码语言:javascript
复制
package com.alibaba.otter.canal.protocol;
...
public class Message implements Serializable{

    private static final long serialVersionUID = 1234034768477580009L;

    private long id;
    @Deprecated
    private List<CanalEntry.Entry> entries = new ArrayList<CanalEntry.Entry>();
    // row data for performance, see:
    // https://github.com/alibaba/canal/issues/726
    private boolean raw = true;
    private List<ByteString> rawEntries = new ArrayList<ByteString>();

在canal-kafka中,Message最后是转换成ByteString发送到kafka的,这里是针对kafka场景使用。

代码语言:javascript
复制
package com.alibaba.otter.canal.kafka.producer;
...
  for (Topic topic : destination.getTopics()) {
         canalKafkaProducer.send(topic, message); // 发送message到所有topic
    }

Entry中包含

代码语言:javascript
复制
Header  
    logfileName [binlog文件名]  
    logfileOffset [binlog position]  
    executeTime [binlog里记录变更发生的时间戳,精确到秒]  
    schemaName   
    tableName  
    eventType [insert/update/delete类型]  
entryType   [事务头BEGIN/事务尾END/数据ROWDATA]  
storeValue  [byte数据,可展开,对应的类型为RowChange]
代码语言:javascript
复制
 System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

对于数据内容,字段信息需要解析成rowChage获取。

代码语言:javascript
复制
rowChage = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChage.getEventType();
rowChage.getBeforeColumnsList();
rowChage.getAfterColumnsList();
代码语言:javascript
复制
RowChange
   isDdl         [是否是ddl变更操作,比如create table/drop table]
   sql           [具体的ddl sql]
   rowDatas      [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
   beforeColumns [Column类型的数组,变更前的数据字段]
   afterColumns  [Column类型的数组,变更后的数据字段]

columns中的需要循环rowChage.getAfterColumnsList()获取

代码语言:javascript
复制
private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

欢迎点赞+收藏+转发朋友圈素质三连

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 编写客户端程序
  • 源码分析
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档