背景
以下内容来自canal官网介绍
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
基于日志增量订阅&消费支持的业务:
数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息
项目介绍
名称:canal [kə’næl]
译意:水道/管道/沟渠
语言:纯java开发
定位:基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql
关键词:mysql binlog parser / real-time / queue&topic
Canal的工作原理
mysql主备复制实现
从上层来看,复制分成三步:
canal的工作原理
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");①
int batchSize = 1000;②
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*"); ③
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
说明:
① 指定ip和端口号
② 指定一次抓取的条数
③ 指定要监控的表,这里会覆盖配置文件
启动之后会看到如下信息
empty count : 1
empty count : 2
empty count : 3
empty count : 4
然后试着修改数据库,再观察控制台打印是否变化。
canal服务端主要是获取binlog信息,canal客户端是负责把获取到的信息推送到不同的下游。我们将从了解canal的开始一步一步解析它。
需要注意,这里只分析canal客户端源码,并非canal-kafka源码。canal-kafka是将kafka作为客户端嵌入到canal里的,并且是直接将信息转成ByteString发送到kafka。
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
从示例中的程序我们可以发现,客户端getWithoutAck获取消息是批量拉取的方式,每次决定了要拉取多少条。这种形式类似kafka的consumer,这样整个客户端的处理数据能力就直接决定了canal的消费速度。
这里还需要说明getWithoutAck返回的是一个Message对象。这个Message是一个包含了一次请求返回多条数据的一个集合。
package com.alibaba.otter.canal.protocol;
...
public class Message implements Serializable {
private static final long serialVersionUID = 1234034768477580009L;
private long id;
private List<CanalEntry.Entry> entries = new ArrayList<CanalEntry.Entry>();
我们可以看到每个Message有一个id,entries中包含的就是我们请求返回的数据,类型是CanalEntry.Entry。 这里需要说明一下,在canal-kafka源码中Message类和canal中的Message是不一样的,主要是增加了两个属性。
package com.alibaba.otter.canal.protocol;
...
public class Message implements Serializable{
private static final long serialVersionUID = 1234034768477580009L;
private long id;
@Deprecated
private List<CanalEntry.Entry> entries = new ArrayList<CanalEntry.Entry>();
// row data for performance, see:
// https://github.com/alibaba/canal/issues/726
private boolean raw = true;
private List<ByteString> rawEntries = new ArrayList<ByteString>();
在canal-kafka中,Message最后是转换成ByteString发送到kafka的,这里是针对kafka场景使用。
package com.alibaba.otter.canal.kafka.producer;
...
for (Topic topic : destination.getTopics()) {
canalKafkaProducer.send(topic, message); // 发送message到所有topic
}
Entry中包含
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里记录变更发生的时间戳,精确到秒]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
对于数据内容,字段信息需要解析成rowChage获取。
rowChage = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChage.getEventType();
rowChage.getBeforeColumnsList();
rowChage.getAfterColumnsList();
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组,变更前的数据字段]
afterColumns [Column类型的数组,变更后的数据字段]
columns中的需要循环rowChage.getAfterColumnsList()获取
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
欢迎点赞+收藏+转发朋友圈素质三连