前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >canal 源码解析系列-store模块解析

canal 源码解析系列-store模块解析

作者头像
用户7634691
发布2021-09-29 15:55:42
6150
发布2021-09-29 15:55:42
举报

引言正文

引言

parser模块用来订阅binlog事件,然后通过sink投递到store。store模块用来执行最终的落库(基于内存),数据存储。

正文

核心接口是CanalEventStore,目前只有一个实现类MemoryEventStoreWithBuffer,这是一个基于内存buffer构建内存memory store。先来看下类图:

CanalStoreScavenge接口是干啥的?CanalEventStore都继承了它。看下它的接口定义:

代码语言:javascript
复制
/**
 * store空间回收机制,信息采集以及控制何时调用{@linkplain CanalEventStore}.cleanUtil()接口
 * 
 * @author jianghang 2012-8-8 上午11:57:42
 * @version 1.0.0
 */
public interface CanalStoreScavenge {

    /**
     * 清理position之前的数据
     */
    void cleanUntil(Position position) throws CanalStoreException;

    /**
     * 删除所有的数据
     */
    void cleanAll() throws CanalStoreException;
}

通过注释大概能看出来,这是专门抽象了一个接口来解决数据清理的问题。比如定时清理,满了之后清理,每次 ack 清理等。晚点再说清理的具体实现原理。

CanalEventStore接口定义的方法很多,不过总体可以分为4类接口:

  • put
  • get
  • ack
  • rollback

接口定义如下:

代码语言:javascript
复制
public interface CanalEventStore<T> extends CanalLifeCycle, CanalStoreScavenge {

    /**
     * 添加一组数据对象,阻塞等待其操作完成 (比如一次性添加一个事务数据)
     */
    void put(List<T> data) throws InterruptedException, CanalStoreException;

    /**
     * 添加一组数据对象,阻塞等待其操作完成或者时间超时 (比如一次性添加一个事务数据)
     */
    boolean put(List<T> data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException;

    /**
     * 添加一组数据对象 (比如一次性添加一个事务数据)
     */
    boolean tryPut(List<T> data) throws CanalStoreException;

    /**
     * 添加一个数据对象,阻塞等待其操作完成
     */
    void put(T data) throws InterruptedException, CanalStoreException;

    /**
     * 添加一个数据对象,阻塞等待其操作完成或者时间超时
     */
    boolean put(T data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException;

    /**
     * 添加一个数据对象
     */
    boolean tryPut(T data) throws CanalStoreException;

    /**
     * 获取指定大小的数据,阻塞等待其操作完成
     */
    Events<T> get(Position start, int batchSize) throws InterruptedException, CanalStoreException;

    /**
     * 获取指定大小的数据,阻塞等待其操作完成或者时间超时
     */
    Events<T> get(Position start, int batchSize, long timeout, TimeUnit unit) throws InterruptedException,
                                                                             CanalStoreException;

    /**
     * 根据指定位置,获取一个指定大小的数据
     */
    Events<T> tryGet(Position start, int batchSize) throws CanalStoreException;

    /**
     * 获取最后一条数据的position
     */
    Position getLatestPosition() throws CanalStoreException;

    /**
     * 获取第一条数据的position,如果没有数据返回为null
     */
    Position getFirstPosition() throws CanalStoreException;

    /**
     * 删除{@linkplain Position}之前的数据
     */
    void ack(Position position) throws CanalStoreException;

    /**
     * 删除指定seqId之前的数据
     * 
     * @Since 1.1.4
     */
    void ack(Position position, Long seqId) throws CanalStoreException;

    /**
     * 出错时执行回滚操作(未提交ack的所有状态信息重新归位,减少出错时数据全部重来的成本)
     */
    void rollback() throws CanalStoreException;

}

在深入分析源码之前,要先理解EventStore的含义,EventStore是一个RingBuffer,有三个指针:Put、Get、Ack。

  • Put: Canal Server从MySQL拉取到数据后,放到内存中,Put增加
  • Get: 消费者(Canal Client)从内存中消费数据,Get增加
  • Ack: 消费者消费完成,Ack增加。并且会删除Put中已经被Ack的数据

RingBuffer是Disruptor中的设计概念,这个有兴趣的可以查阅相关资料

put方法的实现有很多种,但是核心都差不多,最终都是调用doPut方法,我们来看其中一个put方法的源码:

代码语言:javascript
复制
public boolean put(List<Event> data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
        if (data == null || data.isEmpty()) {
            return true;
        }

        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                //检查是否满足插入条件
                if (checkFreeSlotAt(putSequence.get() + data.size())) {
                    doPut(data);
                    return true;
                }
                //判断是否已经超时,如果超时,则不执行插入操作,直接返回false
                if (nanos <= 0) {
                    return false;
                }

                try {
                    //队列满了,等一会
                    nanos = notFull.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //如果一直等待到超时,都没有可用空间可以插入,notFull.awaitNanos会抛出InterruptedException
                    //超时之后,唤醒一个其他执行put操作且未被中断的线程(感觉就是自己不行了,让其他兄弟再试试)
                    notFull.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            lock.unlock();
        }
    }

操作之前先加锁,MemoryEventStoreWithBuffer使用的是ReentrantLock,考虑到put操作的线程安全,这里加速是显而易见的。不过有个细节我们考虑下,为啥要把成员变量lock赋值给一个本地变量再进行后续的操作呢?

其实如果你看的源码比较多,会发现很多地方都有类似的操作。我个人理解,这是一个良好的编码习惯。它至少有两个好处:一是拷贝给方法的本地变量后,访问效率更高(一个是在堆上,一个是栈上)。另一个原因本地变量用了final修饰,可以保证在操作的过程中即使成员变量被修改了也不会影响到自己。

然后我们看到notFull变量,还有一个变量叫notEmpty,他们两个一般搭配使用:

代码语言:javascript
复制
// 阻塞put/get操作控制信号
    private ReentrantLock     lock          = new ReentrantLock();
    private Condition         notFull       = lock.newCondition();
    private Condition         notEmpty      = lock.newCondition();

这是一个比较经典的使用场景,在这个场景中,通常有生产者和消费者两个角色,然后有一个共享缓冲区。生产者用于将消息放入缓冲区,消费者用于从缓冲区中取出消息。

当缓冲区已经满了,而此时生产者还想放入一个新的数据的时候,解决方法是让生产者此时进行休眠(notFull.await),等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它(notFull.signal)。

同样当缓冲区已经空了,而消费者还想去取数据,此时让消费者进行休眠(notEmpty.await()),等待生产者放入一个或者多个数据时再唤醒它(notEmpty.signal)。我们可以用一幅图来描述上述流程:

checkFreeSlotAt方法是用来检查是否满足插入条件,看看它的实现:

代码语言:javascript
复制
/**
     * 查询是否有空位
     */
    private boolean checkFreeSlotAt(final long sequence) {
        //1、检查是否足够的slot。入参的sequence值是:当前putSequence位置 + 新插入的event的记录数。
        //结合文章中的图,其减去bufferSize不能大于ack位置,或者换一种说法,减去bufferSize不能大于ack位置。
        final long wrapPoint = sequence - bufferSize;//队列的长度默认是16*1024
        final long minPoint = getMinimumGetOrAck();//取get或者ack较小的
        if (wrapPoint > minPoint) { // 刚好追上一轮
            return false;
        } else {
            // 在bufferSize模式上,再增加memSize控制
            //2、如果batchMode是MEMSIZE,继续检查是否超出了内存限制
            if (batchMode.isMemSize()) {
                final long memsize = putMemSize.get() - ackMemSize.get();
                if (memsize < bufferSize * bufferMemUnit) {
                    return true;
                } else {
                    return false;
                }
            } else {
                return true;
            }
        }
    }

注释写的比较详细了,不多说。

接下来看看核心方法doPut

代码语言:javascript
复制
/**
     * 执行具体的put操作
     */
    private void doPut(List<Event> data) {
        long current = putSequence.get();//当前put的位置
        long end = current + data.size();

        // 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值
        for (long next = current + 1; next <= end; next++) {
            //通过getIndex方法对next变量转换成正确的位置,设置到Event[]数组中,
            // Event[]数组是环形队列的底层实现,其大小为bufferSize值,默认为16 * 1024。
            // getindex通过与操作一个mask防止越界
            entries[getIndex(next)] = data.get((int) (next - current - 1));
        }

        putSequence.set(end);//更新位置

        // 记录一下gets memsize信息,方便快速检索
        if (batchMode.isMemSize()) {
            long size = 0;
            for (Event event : data) {
                size += calculateSize(event);
            }

            putMemSize.getAndAdd(size);
        }
        profiling(data, OP.PUT);//记录put时间,监控用
        // tell other threads that store is not empty
        notEmpty.signal();
    }

总结下doPut方法,主要有4个操作:

  1. 的event数据赋值到Event[]数组上
  2. 更新ringbuffer上的put位置
  3. 如果是内存模式,计算新插入的event数据对内存的影响
  4. 调用notEmpty.signal()方法,通知buffer有数据了(get操作阻塞的线程可以被唤醒)

get方法不说了,理解了put很容易理解get方法。我们来看下ack方法:

代码语言:javascript
复制
public void ack(Position position) throws CanalStoreException {
        cleanUntil(position, -1L);
    }

前面我们提到了CanalStoreScavenge接口,有两个方法:cleanUntilcleanAll,后者在stop的时候调用,而前者就是在ack的时候调用。

代码语言:javascript
复制
public void cleanUntil(Position position, Long seqId) throws CanalStoreException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            long sequence = ackSequence.get();//当前ack的位置
            long maxSequence = getSequence.get();//当前get的位置

            boolean hasMatch = false;
            long memsize = 0;
            // ack没有list,但有已存在的foreach,还是节省一下list的开销
            long localExecTime = 0L;
            int deltaRows = 0;
            if (seqId > 0) {
                maxSequence = seqId;
            }
            //遍历所有未被ack的event,从中找出与需要ack的position相同位置的event,清空这个event之前的所有数据。
            for (long next = sequence + 1; next <= maxSequence; next++) {
                Event event = entries[getIndex(next)];//要ack的event
                if (localExecTime == 0 && event.getExecuteTime() > 0) {
                    localExecTime = event.getExecuteTime();
                }
                deltaRows += event.getRowsCount();//要ack的event包含的mysql行数
                memsize += calculateSize(event);//要ack的event占用字节数
                if ((seqId < 0 || next == seqId) && CanalEventUtils.checkPosition(event, (LogPosition) position)) {
                    // 找到对应的position,更新ack seq
                    hasMatch = true;

                    if (batchMode.isMemSize()) {
                        ackMemSize.addAndGet(memsize);
                        // 尝试清空buffer中的内存,将ack之前的内存全部释放掉
                        for (long index = sequence + 1; index < next; index++) {
                            entries[getIndex(index)] = null;// 设置为null,方便GC
                        }

                        // 考虑getFirstPosition/getLastPosition会获取最后一次ack的position信息
                        // ack清理的时候只处理entry=null,释放内存
                        Event lastEvent = entries[getIndex(next)];
                        lastEvent.setEntry(null);//方便gc
                        lastEvent.setRawEntry(null);//方便gc
                    }

                    //更新ack的值
                    if (ackSequence.compareAndSet(sequence, next)) {// 避免并发ack
                        notFull.signal();//清空了就可以继续put数据了
                        ackTableRows.addAndGet(deltaRows);
                        if (localExecTime > 0) {
                            ackExecTime.lazySet(localExecTime);
                        }
                        return;
                    }
                }
            }
            if (!hasMatch) {// 找不到对应需要ack的position
                throw new CanalStoreException("no match ack position" + position.toString());
            }
        } finally {
            lock.unlock();
        }
    }

Ack操作实际上就是将消费成功的事件从队列中删除,如果一直不Ack的话,队列满了之后,Put操作就无法添加新的数据了。

rollback方法比较简单,这里不多说了。


参考:

  • https://www.bookstack.cn/read/canal-v1.1.4/34357b71c7c1f182.md#%E6%9E%B6%E6%9E%84
  • http://www.tianshouzhi.com/api/tutorials/canal/401
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 犀牛的技术笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • 正文
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档