前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >canal 源码解析系列-canal的HA机制解析

canal 源码解析系列-canal的HA机制解析

作者头像
用户7634691
发布2021-09-29 15:56:04
1.1K0
发布2021-09-29 15:56:04
举报

canal 源码解析系列-canal的HA机制解析

引言

首先什么是HA?HA指的是High Available,也就是高可用。通常我们一个服务要支持HA都要借助于第三方的分布式同步协调服务,最常用的是zookeeper(以下简称ZK)。canal实现高可用,主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点。

正文

canal的整个HA机制,分为两部分。canal server和client都要有对应的实现。

首先在server端,为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。也就是说,只会有一个canal server的instance处于active状态,但是当这个instance down掉后会重新选出一个canal server。

在client端,为了保证有序性,一个instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

这里在解释下instance。instance 是 canal 数据同步的核心,在一个 canal 实例中只有启动instace才能进行数据的同步任务。一个 canal server 实例中可以创建多个 CanalInstance 实例。每一个 CanalInstance 可以看成是对应一个 MySQL 实例。

在开始分析canal的HA机制之前,有两个zk概念需要先铺垫下,分别是ZK的watcher和EPHEMERAL节点。他们是canal实现HA的基础。

先说说watcher。

Zookeeper采用了Watcher机制实现数据的发布/订阅功能,当被订阅对象发生变化时会通知订阅者(ZK客户端)。Watcher实现由三个部分组成:

  • ZK服务端
  • ZK客户端
  • 客户端的ZKWatchManager对象

客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的ZKWatchManager中。当ZK服务端监听的数据状态发生变化时,服务端主动通知客户端,客户端的ZKWatchManager会触发Watcher来回调处理逻辑。

EPHEMERAL节点也叫临时节点,是zk中的一种节点类型(还有永久节点)。这种节点和一次会话(session)绑定,会话断开节点会被删除。

canal server 启动时向 zookeeper 创建的节点就是临时节点,它与 session 生命周期绑定,当我手动执行关闭命令,客户端会话会失效,临时节点会自动清除; 一旦 zookeeper 发现 canal server 机器创建的节点消失后,就会通知其它的 canal server 再次进行向 zookeeper 尝试创建临时节点的操作,就会有新的 active 节点产生;

基础知识打好了,我们继续。

server端的HA机制

先来看看server端的HA实现机制。

首先在配置文件canal.properties中有如下配置:

代码语言:javascript
复制
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000

第一个是zk的地址,比如可以写127.0.0.1:2181,第二个配置是使用PeriodMixedMetaManager管理位点信息,这个信息会定时刷新到zk上。代码如下:

代码语言:javascript
复制
// 启动定时工作任务
        executor.scheduleAtFixedRate(() -> {
            List<ClientIdentity> tasks = new ArrayList<>(updateCursorTasks);
            for (ClientIdentity clientIdentity : tasks) {
                try {
                    updateCursorTasks.remove(clientIdentity);

                    // 定时将内存中的最新值刷到zookeeper中,多次变更只刷一次
                    zooKeeperMetaManager.updateCursor(clientIdentity, getCursor(clientIdentity));
                } catch (Throwable e) {
                    // ignore
                    logger.error("period update" + clientIdentity.toString() + " curosr failed!", e);
                }
            }
        }, period, period, TimeUnit.MILLISECONDS);
    }

canal server 在tcp模式下的调用链路是这样的:

com.alibaba.otter.canal.deployer.CanalLauncher#main --> com.alibaba.otter.canal.deployer.CanalStarter#start --> com.alibaba.otter.canal.deployer.CanalController#start

先看下CanalController这个类的构造方法:

代码语言:javascript
复制
//读取配置文件
        final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
        if (StringUtils.isNotEmpty(zkServers)) {
            zkclientx = ZkClientx.getZkClient(zkServers);
            // 初始化系统目录
            ///otter/canal/destinations:用于存放instance信息
            zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
            ///otter/canal/cluster,整个canal server的集群节点信息
            zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
        }
        ...

这里创建的目录都是永久节点。

然后CanalController#start的代码会初始化canal在zookeeper上的节点系统目录,

代码语言:javascript
复制
public void start() throws Throwable {
        logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
        // 创建整个canal的工作节点
        //会在/otter/canal/cluster节点下创建"ip:port"临时节点,如/otter/canal/cluster/127.0.0.1:8080
        //这里是根据ip+端口获取节点在zk的完整路径
        final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
        initCid(path);
        if (zkclientx != null) {
            this.zkclientx.subscribeStateChanges(new IZkStateListener() {

                public void handleStateChanged(KeeperState state) throws Exception {

                }

                //会话过期后重新建立新会话时再次创建"ip:port"临时节点。
                public void handleNewSession() throws Exception {
                    initCid(path);
                }

                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    logger.error("failed to connect to zookeeper", error);
                }
            });
        }
        // 优先启动embeded服务
        embededCanalServer.start();
        // 尝试启动一下非lazy状态的通道
        for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
            final String destination = entry.getKey();
            InstanceConfig config = entry.getValue();
            // 创建destination的工作节点
            if (!embededCanalServer.isStart(destination)) {
                // HA机制启动
                //ServerRunningMonitor 是针对server的running节点控制类,用于管理instance
                ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                if (!config.getLazy() && !runningMonitor.isStart()) {
                    runningMonitor.start();
                }
            }

            ....
        }


    }

干了这么几件事情:

  • 会在/otter/canal/cluster节点下创建"ip:port"临时节点
  • 注册IZkStateListener,监听zk的连接状态变化,当会话过期后重新建立新会话时再次创建"ip:port"临时节点。
  • 启动canal server
  • 启动ServerRunningMonitor 3.HA机制启动。对于每一个instance,都会在/otter/canal/destinations节点下记录自己的canal-server和canal-client信息。每个canal-server对每个instance的管理是交给ServerRunningMonitor类的。

ServerRunningMonitor 是针对server的running节点控制类,用于管理instance。首先来看看它的构造方法:

代码语言:javascript
复制
public ServerRunningMonitor(){
        // 创建父节点

        //用来监听该节点的数据增删改变化的listener
        dataListener = new IZkDataListener() {

            //数据发生变化回调
            // "/otter/canal/destinations/{destination}/running" 临时节点
            // //表示当前为该instance服务的canal server节点是谁,如果canal server与zk连接超时,会导致该临时节点被删除。
            // canal server注册在该节点上的dataListener便会监听到这一变化,比如可以做主备切换之类的操作。
            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put("destination", destination);//方便日志跟踪
                ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
                if (!isMine(runningData.getAddress())) {
                    mutex.set(false);
                }

                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
                    //删除zk上该临时节点,关闭instance
                    releaseRunning();// 彻底释放mainstem
                }

                activeData = (ServerRunningData) runningData;
            }

            //数据删除回调

            /**
             * 当节点被删除时,如果上一次active的状态就是本机,调用initRunning即时触发一下抢占。
             * 否则就是等待delayTime之后在抢占,避免因网络瞬端或者zk异常,导致出现频繁的切换操作。
             */
            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put("destination", destination);
                mutex.set(false);
                if (!release && activeData != null && isMine(activeData.getAddress())) {
                    // 如果上一次active的状态就是本机,则即时触发一下active抢占
                    initRunning();
                } else {
                    // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
                    delayExector.schedule(() -> initRunning(), delayTime, TimeUnit.SECONDS);
                }
            }

        };

    }

就是新建了一个IZkDataListener实例,并实现了它的回调方法。每个回调方法的功能注释都写得很清楚了。

再看看它的start方法:

代码语言:javascript
复制
public synchronized void start() {
        super.start();
        try {
            processStart();
            if (zkClient != null) {
                // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
                String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
                //在/otter/canal/destinations/{destination}/running 节点下注册dataListener,用来监听该节点的数据增删改变化。
                zkClient.subscribeDataChanges(path, dataListener);

                initRunning();//初始化instance下的canal-server信息
            } else {
                processActiveEnter();// 没有zk,直接启动
            }
        } catch (Exception e) {
            logger.error("start failed", e);
            // 没有正常启动,重置一下状态,避免干扰下一次start
            stop();
        }

    }

方法不长,调用了processStart方法:

代码语言:javascript
复制
public void processStart() {
                    try {
                        if (zkclientx != null) {
                            final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
                                registerIp + ":" + port);
                            initCid(path);

                            zkclientx.subscribeStateChanges(new IZkStateListener() {

                                public void handleStateChanged(KeeperState state) throws Exception {

                                }

                                public void handleNewSession() throws Exception {
                                    initCid(path);
                                }

                                @Override
                                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                                    logger.error("failed to connect to zookeeper", error);
                                }
                            });
                        }

processStart方法,会在/otter/canal/destinations/{destination}/cluster 节点下注册IZkStateListener,用来监听zk的连接状态变化,同时创建"ip:port"临时节点。这个临时节点主要是用来给canal client提供该instance下可用canal serve节点列表。

接着在/otter/canal/destinations/{destination}/running节点下注册dataListener,用来监听该节点的数据增删改变化。

initRunning方法用来初始化instance下的canal server信息:

代码语言:javascript
复制
private void initRunning() {
        if (!isStart()) {
            return;
        }

        //服务端当前正在提供服务的running节点路径
        String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
        // 序列化
        byte[] bytes = JsonUtils.marshalToByte(serverData);
        try {
            //mutex是canal自己实现的一个同步类,基于AQS
            mutex.set(false);
            zkClient.create(path, bytes, CreateMode.EPHEMERAL);//创建临时节点
            activeData = serverData;
            processActiveEnter();// 触发一下事件
            mutex.set(true);
            release = false;
        } catch (ZkNodeExistsException e) {
            bytes = zkClient.readData(path, true);
            if (bytes == null) {// 如果不存在节点,立即尝试一次
                initRunning();
            } else {
                activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
            }
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
            initRunning();
        }
    }

mutex设置false,代表访问的线程需要被阻塞挂起,等待mutex变为true被唤醒。所以这里能保证在高并发情况下服务端当前正在提供服务的running节点只有一个。

我们来梳理下目前看到的永久节点和临时节点

  • /otter/canal/cluster 永久节点目录,集群的节点信息根目录,代表正在运行的canal server
  • /otter/canal/cluster/ip:port 临时节点,存放具体的节点信息
  • /otter/canal/destinations 存放多个instance信息
  • /otter/canal/destinations/{destination}/cluster 代表当前instance下有多少可用的canal server
  • /otter/canal/destinations/{destination}/running 临时节点下的数据代表当前instance的激活canal-server是谁,每个正常运行的canal-server都会在/otter/canal/destinations/{destination}/running 临时节点下注册dataListener,用于及时做HA切换。

client端的HA机制

前面说了,在client端,为了保证有序性,一个instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

client端的HA实现更多的依赖是canal的使用方,官方给了一个例子可以参考,那就是ClusterCanalClientTest类。使用起来很简单,如下:

代码语言:javascript
复制
// 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
        CanalConnector connector = CanalConnectors.newClusterConnector("127.0.0.1:2181", destination, "canal", "canal");

        final ClusterCanalClientTest clientTest = new ClusterCanalClientTest(destination);
        clientTest.setConnector(connector);
        clientTest.start();

CanalConnector实现了failover的机制,支持失败重连,代码如下:

代码语言:javascript
复制
/**
     * 创建带cluster模式的客户端链接,自动完成failover切换,服务器列表自动扫描
     */
    public static CanalConnector newClusterConnector(String zkServers, String destination, String username,
                                                     String password) {
        ClusterCanalConnector canalConnector = new ClusterCanalConnector(username,
            password,
            destination,
            new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
        canalConnector.setSoTimeout(60 * 1000);
        canalConnector.setIdleTimeout(60 * 60 * 1000);
        return canalConnector;
    }

注意这个ClusterNodeAccessStrategy,CanalConnector就是通过ClusterNodeAccessStrategy来注册zk的listeners感知这些数据的变化。

代码语言:javascript
复制
if (accessStrategy instanceof ClusterNodeAccessStrategy) {
                        //初始化zkclient
                        currentConnector.setZkClientx(((ClusterNodeAccessStrategy) accessStrategy).getZkClient());
                    }
代码语言:javascript
复制
public void setZkClientx(ZkClientx zkClientx) {
        this.zkClientx = zkClientx;
        initClientRunningMonitor(this.clientIdentity);
    }

上面的代码,设置当前connector的zkClient,并在zk上初始化客户端信息。ClientRunningMonitor是客户端的instance管理类,它的启动在com.alibaba.otter.canal.client.impl.SimpleCanalConnector#connect,如下:

代码语言:javascript
复制
public void connect() throws CanalClientException {
        ...

        //这个runningMoitor是ClientRunningMonitor
        if (runningMonitor != null) {
            if (!runningMonitor.isStart()) {
                runningMonitor.start();
            }
        } else {
            ...
        }

继续来看看runningMonitor.start里都干了啥,

代码语言:javascript
复制
public void start() {
        super.start();

        // /otter/canal/destinations/{destination}/{clientid}/running
        // 表示客户端目前正在工作的节点
        String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
        //监听数据变化,负责客户端的HA切换
        zkClient.subscribeDataChanges(path, dataListener);
        initRunning();
    }

dataListener定义如下:

代码语言:javascript
复制
public ClientRunningMonitor(){
        dataListener = new IZkDataListener() {

            //数据变化时,保存当前的running data,其中包含客户端地址,clientid等
            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put("destination", destination);
                ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
                if (!isMine(runningData.getAddress())) {
                    mutex.set(false);
                }

                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
                    release = true;
                    releaseRunning();// 彻底释放mainstem
                }

                activeData = (ClientRunningData) runningData;
            }


            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put("destination", destination);
                mutex.set(false);
                // 触发一下退出,可能是人为干预的释放操作或者网络闪断引起的session expired timeout
                processActiveExit();
                if (!release && activeData != null && isMine(activeData.getAddress())) {
                    // 如果上一次active的状态就是本机,则即时触发一下active抢占
                    initRunning();
                } else {
                    // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
                    delayExector.schedule(() -> initRunning(), delayTime, TimeUnit.SECONDS);
                }
            }

        };

    }

initRunning方法会在 /otter/canal/destinations/{destination}/{clientid}/running创建临时节点写入数据,写入信息为客户端的IP,port和clientId信息。创建临时节点成功才能与canal server建立连接。

这个临时节点的创建过程在ClientRunningMonitor.initRunning中,创建临时节点成功才能与canal server建立连接。临时节点写入信息为客户端的IP,port和clientId信息。

客户端的HA还有一个重要的逻辑,就是在HA模式下,客户端位点,filter等信息会放到zk上,方便canal server切换时的共用。我们前面的文章讲过,canal的meta信息管理器接口时CanalMetaManager,它有多个实现类,其中就有ZooKeeperMetaManager。这部分的代码如下:

代码语言:javascript
复制
public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
            clientIdentity.getClientId());

        try {
            zkClientx.createPersistent(path, true);
        } catch (ZkNodeExistsException e) {
            // ignore
        }
        ///如果客户端存在filter,则创建/otter/canal/destinations/{destination}/{clientId}/filter持久节点,存放客户端的filter信息。
        if (clientIdentity.hasFilter()) {
            String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
                clientIdentity.getClientId());

            byte[] bytes = null;
            try {
                bytes = clientIdentity.getFilter().getBytes(ENCODE);
            } catch (UnsupportedEncodingException e) {
                throw new CanalMetaManagerException(e);
            }

            try {
                zkClientx.createPersistent(filterPath, bytes);
            } catch (ZkNodeExistsException e) {
                // ignore
                zkClientx.writeData(filterPath, bytes);
            }
        }
    }

最后总结下client端的HA机制。

canal client与canal server建立连接前,会创建临时节点/otter/canal/destinations/{destination}/{clientId}/running,并在改节点下写入自己的IP,port,clientId信息,表示当前该instance下激活的client是自己。

同时每个canal client都会在节点上注册dataListener,监听节点数据变化负责客户端的HA切换。当前被激活的client会通过ClusterNodeAccessStrategy(nextNode)获得zk上canal server的信息并与之建立连接。

客户端发送SUBSCRIPTION 请求给canal server,客户端位点,filter等信息会被注册到zk上,方便canal server切换时的共用。


参考:

  • https://www.bookstack.cn/read/canal-v1.1.4/34357b71c7c1f182.md#HA%E6%9C%BA%E5%88%B6%E8%AE%BE%E8%AE%A1
  • https://cloud.tencent.com/developer/article/1648637
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 犀牛的技术笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • 正文
    • server端的HA机制
      • client端的HA机制
      相关产品与服务
      云数据库 MySQL
      腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档