前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >canal 源码解析系列-CanalServerWithEmbedded解读

canal 源码解析系列-CanalServerWithEmbedded解读

作者头像
用户7634691
发布2021-09-08 11:31:52
6270
发布2021-09-08 11:31:52
举报

前面的文章简单说过这个类。canal server模块的核心接口为CanalServer,有两个实现:

  • CanalServerWithEmbedded
  • CanalServerWithNetty

这两个实现代表了canal的两种应用模式,CanalServerWithNetty在canal独立部署场景发挥作用,开发者只需要实现cient,不同的应用通过canal client与canal server进行通信,canal client的请求统一由CanalServerWithNetty接受进行处理。

而通过CanalServerWithEmbeded,可以不需要独立部署canal,而是把canal嵌入到我们自己的服务里。但是这种对开发者的要求就比较高。

下面的图表示二者的关系,

分层结构图.png

从图上可以看出,CanalServerWithNetty是client通过http访问的canal的门户,而它往下其实也是调用CanalServerWithEmbedded来完成具体的工作。所以我们就来具体说说这个CanalServerWithEmbedded

从上图还可以看出,CanalServerWithEmbedded 内部管理所有的CanalInstance,通过 Client 的信息(destination),找到 Client 订阅的 CanalInstance,然后调用 CanalInstance 内部的各个模块进行处理。关于instance后面会有专门的文章分析,这里暂且不表。

下面涉及到源码的地方,我都经过了处理,删减了一些不重要的代码(比如参数校验),便于理解

canal处理client请求的核心逻辑都在SessionHandler这个类中,这个handler在接收到客户端请求是调用CanalServerWithEmbeded对应的方法进行处理,如下所示:

代码语言:javascript
复制
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        logger.info("message receives in session handler...");
        long start = System.nanoTime();
        ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
        Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
        ClientIdentity clientIdentity = null;
        try {
            switch (packet.getType()) {
                case SUBSCRIPTION://订阅请求
                    ...

                        embeddedServer.subscribe(clientIdentity);
                        ...
                    } else {
                        byte[] errorBytes = NettyUtils.errorPacket(401,
                            MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage());
                        NettyUtils.write(ctx.getChannel(),
                            errorBytes,
                            new ChannelFutureAggregator(sub.getDestination(),
                                sub,
                                packet.getType(),
                                errorBytes.length,
                                System.nanoTime() - start,
                                (short) 401));
                    }
                    break;
                case UNSUBSCRIPTION://取消订阅
                    ...
                        embeddedServer.unsubscribe(clientIdentity);
                        stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭
                        ...
                    break;
                case GET://获取binlog
                    Get get = CanalPacket.Get.parseFrom(packet.getBody());
                    ...
                        if (get.getTimeout() == -1) {// 是否是初始值
                            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
                        } else {
                            TimeUnit unit = convertTimeUnit(get.getUnit());
                            message = embeddedServer.getWithoutAck(clientIdentity,
                                get.getFetchSize(),
                                get.getTimeout(),
                                unit);
                        }
                        // }

                        ...
                    break;
                case CLIENTACK://客户端消费成功ack
                    ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());
                    ...
                        clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));
                        embeddedServer.ack(clientIdentity, ack.getBatchId());
                        ...
                                                break;
                case CLIENTROLLBACK://客户端消费失败回滚请求
                    ...
                        if (rollback.getBatchId() == 0L) {
                            embeddedServer.rollback(clientIdentity);// 回滚所有批次
                        } else {
                            embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
                        }
                        ...
                                            break;
                default:
                    byte[] errorBytes = NettyUtils.errorPacket(400,
                        MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
                    NettyUtils.write(ctx.getChannel(), errorBytes, new ChannelFutureAggregator(ctx.getChannel()
                        .getRemoteAddress()
                        .toString(), null, packet.getType(), errorBytes.length, System.nanoTime() - start, (short) 400));
                    break;
    }

来看下CanalServerWithEmbedded类,

代码语言:javascript
复制
public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {

    private static final Logger        logger  = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
    private Map<String, CanalInstance> canalInstances;//destination --> CanalInstance
    ...

实现了一个抽象类,两个接口。里面的一个map存放的是destination和CanalInstance的对应关系,这个我们前面的文章说过。

AbstractCanalLifeCycleCanalServer主要是继承的start和stop方法,很简单,这里不表。来看看实现的CanalService的接口。

代码语言:javascript
复制
public interface CanalService {

    //订阅
    void subscribe(ClientIdentity clientIdentity) throws CanalServerException;
    //取消订阅
    void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException;
    //获取数据,自动ack
    Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
    //获取数据,自动ack,可以指定超时时间
    Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException;
    //获取数据,不ack
    Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;
    //获取数据,不ack,超时时间
    Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                                                                    throws CanalServerException;
    //ack某个批次的数据
    void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException;
    //回滚没有ack的批次的数据
    void rollback(ClientIdentity clientIdentity) throws CanalServerException;
    //回滚某个批次的数据
    void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException;
}

先来看看订阅方法,

代码语言:javascript
复制
/**
     * 客户端订阅,重复订阅时会更新对应的filter信息
     */
    @Override
    public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());//检查 destination对应的instance已经启动

        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        //instance持有的metaManager没启动的话就重新启动
        if (!canalInstance.getMetaManager().isStart()) {
            canalInstance.getMetaManager().start();
        }

        //通过CanalInstance的CanalMetaManager组件进行元数据管理,记录一下当前这个CanalInstance有客户端在订阅
        canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅

        //客户端当前订阅的binlog位置
        Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
        if (position == null) {
            //如果是第一次订阅,尝试从CanalEventStore中获取第一个binlog的位置,作为客户端订阅开始的位置。
            position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
            if (position != null) {
                canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
            }
            logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
        } else {
            logger.info("subscribe successfully, {} use last cursor position:{} ", clientIdentity, position);
        }

        // 通知下订阅关系变化
        canalInstance.subscribeChange(clientIdentity);
    }

解释下。首先可以看到是通过canalInstance下的MetaManager管理订阅,包括binlog的位置这些都在MetaManager下保存。CanalMetaManager接口有几个实现类:

代码语言:javascript
复制
FileMixedMetaManager
MemoryMetaManager
MixedMetaManager
PeriodMixedMetaManager
ZooKeeperMetaManager

这些实现类之间有些会持有其它实现的引用来装饰自己的功能。具体服务中使用哪个实现是我们可以设置的,通过指定对应的枚举设置,下面是个例子:

代码语言:javascript
复制
public static enum MetaMode {
        /** 内存存储模式 */
        MEMORY,
        /** 文件存储模式 */
        ZOOKEEPER,
        /** 混合模式,内存+文件 */
        MIXED,
        /** 本地文件存储模式 */
        LOCAL_FILE;
...

CanalParameter parameter = new CanalParameter();

        parameter.setZkClusters(Arrays.asList(cluster1));
        parameter.setMetaMode(MetaMode.MEMORY);
        ...

然后是取消订阅方法,

代码语言:javascript
复制
/**
     * 取消订阅
     */
    @Override
    public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅

        logger.info("unsubscribe successfully, {}", clientIdentity);
    }

这个比较简单,不做过多解释。

看下get方法,

代码语言:javascript
复制
/**
     * 获取数据
     *
     * <pre>
     * 注意:meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
     * </pre>
     */
    @Override
    public Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
        return get(clientIdentity, batchSize, null, null);
    }

最终调用的是下面这个,

代码语言:javascript
复制
/**
     * 获取数据,可以指定超时时间.
     *
     * <pre>
     * 几种case:
     * a. 如果timeout为null,则采用tryGet方式,即时获取
     * b. 如果timeout不为null
     *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回
     *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少
     * 
     * 注意:meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
     * </pre>
     */
    @Override
    public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
                                                                                                 throws CanalServerException {
        ...
        synchronized (canalInstance) {//会读写流数据,这里加锁防止并发
            // 获取到流式数据中的最后一批获取的位置
            //从CanalMetaManager中获取最后一个没有ack的binlog批次的位置信息
            PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);

            if (positionRanges != null) {
                throw new CanalServerException(String.format("clientId:%s has last batch:[%s] isn't ack , maybe loss data",
                    clientIdentity.getClientId(),
                    positionRanges));
            }

            Events<Event> events = null;
            //从当前store中的第一条开始获取
            Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
            events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);

            if (CollectionUtils.isEmpty(events.getEvents())) {
                logger.debug("get successfully, clientId:{} batchSize:{} but result is null",
                    clientIdentity.getClientId(),
                    batchSize);
                //如果获取到的binlog消息为空,构造一个空的Message对象,将batchId设置为-1返回给客户端
                return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能
            } else {
                // 记录到流式信息
                //如果获取到了binlog消息,将这个批次的binlog消息记录到CanalMetaMaager中,并生成一个唯一的batchId
                Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
                boolean raw = isRaw(canalInstance.getEventStore());
                List entrys = null;
                if (raw) {
                    entrys = Lists.transform(events.getEvents(), Event::getRawEntry);
                } else {
                    entrys = Lists.transform(events.getEvents(), Event::getEntry);
                }
                if (logger.isInfoEnabled()) {
                    logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
                        clientIdentity.getClientId(),
                        batchSize,
                        entrys.size(),
                        batchId,
                        events.getPositionRange());
                }
                // 直接提交ack
                ack(clientIdentity, batchId);
                //根据获取到的信息,构造message返回
                return new Message(batchId, raw, entrys);
            }
        }
    }

getWithoutAck就是不带ack的get,所以这里就不多说了。

ack方法如下,

代码语言:javascript
复制
/**
     * 进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
     *
     * <pre>
     * 注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
     * </pre>
     */
    @Override
    public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
        ...

        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        PositionRange<LogPosition> positionRanges = null;
        positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置
        if (positionRanges == null) { // 说明是重复的ack/rollback
            throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",
                clientIdentity.getClientId(),
                batchId));
        }



        // 更新cursor
        if (positionRanges.getAck() != null) {
            canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
            if (logger.isInfoEnabled()) {
                logger.info("ack successfully, clientId:{} batchId:{} position:{}",
                    clientIdentity.getClientId(),
                    batchId,
                    positionRanges);
            }
        }

        // 可定时清理数据
        //从CanalEventStore中,将这个批次的binlog内容移除
        canalInstance.getEventStore().ack(positionRanges.getEnd(), positionRanges.getEndSeq());
    }

ack方法是客户端用户确认某个批次的binlog消费成功。确认之后,小于等于此 batchId 的 Message 都会被确认(其实就是更新消费成功的位置)。

rollback是用来回滚的。

代码语言:javascript
复制
/**
     * 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
     */
    @Override
    public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        // 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅
        boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
        if (!hasSubscribe) {
            return;
        }

        synchronized (canalInstance) {
            // 清除batch信息,所有批次
            canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
            // rollback eventStore中的状态信息
            canalInstance.getEventStore().rollback();
            logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() });
        }
    }

eventstore的rollback方法也很简单,主要是更新下标,

代码语言:javascript
复制
public void rollback() throws CanalStoreException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            getSequence.set(ackSequence.get());//更新最后一次get的位置为ack的位置
            getMemSize.set(ackMemSize.get());//更新get内存大小为ack内存大小
        } finally {
            lock.unlock();
        }
    }

参考:

  • http://www.tianshouzhi.com/api/tutorials/canal/382
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-08-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 犀牛的技术笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档