前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >canal源码解析(1)—HA模式的实现

canal源码解析(1)—HA模式的实现

作者头像
Monica2333
发布2020-06-19 17:45:18
1.8K0
发布2020-06-19 17:45:18
举报
文章被收录于专栏:码农知识点码农知识点

最近在看canal源码,有一些疑问。比如canal的HA模式是怎么实现的,mysql dump的位点又是怎么确定的,canal客户端是如何获取数据和ack的,又是如何实现mysql主备切换的等等,针对这些疑问我将输出几篇源码分析,欢迎指正交流。本文是关于canal 服务端和客户端的HA实现源码分析。在此之前,建议大家对canal的整体架构有所了解,可参考官方文档。首先看下官方文档中对HA机制的描述。

HA机制设计

canal的ha分为两部分,canal server和canal client分别有对应的ha实现。 canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态. canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。 整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。

canal-server

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.下文将开始分析ha的源码实现。

Canal Server HA实现

首先看下canal如何开启HA模式: canal.properties中加入以下配置:

代码语言:javascript
复制
//指定注册的zk地址
canal.zkServers =127.0.0.1:2181
//此配置会使用PeriodMixedMetaManager管理位点,会把ack位点注册到zk节点上,当failover时可从ack位点处重新消费
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

然后我们在看canal-server启动是如何和zookeeper交互的,这部分主要是canal server端的HA实现。 启动入口为CanalLauncher的main方法,canal.serverMode = tcp模式下的实际启动类为CanalController。 初始化CanalController时,会初始化canal在zookeeper上的节点系统目录。

代码语言:javascript
复制
public CanalController(final Properties properties){
······························省略与HA无关代码····································
 final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
        if (StringUtils.isNotEmpty(zkServers)) {
            zkclientx = ZkClientx.getZkClient(zkServers);
            // 如果不存在以下目录,则初始化系统目录
            // /otter/canal/destinations:用于存放instance信息
zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
           // /otter/canal/cluster:用于存放canal-server节点信息
zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
        }
······························省略与HA无关代码····································
}

启动CanalController时,

代码语言:javascript
复制
 public void start() throws Throwable {
        logger.info("## start the canal server[{}:{}]", ip, port);
        // 创建整个canal的工作节点
        final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port);
//1.
        initCid(path);
        if (zkclientx != null) {
//2.
            this.zkclientx.subscribeStateChanges(new IZkStateListener() {

                public void handleStateChanged(KeeperState state) throws Exception {

                }

                public void handleNewSession() throws Exception {
                    initCid(path);
                }

                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    logger.error("failed to connect to zookeeper", error);
                }
            });
        }
        // 优先启动embeded服务
        embededCanalServer.start();
        // 尝试启动一下非lazy状态的通道
        for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
            final String destination = entry.getKey();
            InstanceConfig config = entry.getValue();
            // 创建destination的工作节点
            if (!embededCanalServer.isStart(destination)) {
                // 3.HA机制启动
                ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                if (!config.getLazy() && !runningMonitor.isStart()) {
                    runningMonitor.start();
                }
            }

        // 启动网络接口
        if (canalServer != null) {
            canalServer.start();
        }
    }

1.会在/otter/canal/cluster节点下创建"ip:port"临时节点,如/otter/canal/cluster/10.33.200.132:11111 2.注册IZkStateListener,用来监听和zk的连接状态变化,这样当会话过期后重新建立新会话时再次创建"ip:port"临时节点。 3.HA机制启动。对于每一个instance,都会在/otter/canal/destinations节点下记录自己的canal-server和canal-client信息。每个canal-server对每个instance的管理是交给ServerRunningMonitor类的。 3.1.ServerRunningMonitor的初始化,调用ServerRunningMonitors.getRunningMonitor(destination)时如果为null,便会调用ServerRunningMonitor.apply方法。

代码语言:javascript
复制
        final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
//设置canal-server信息
        ServerRunningMonitors.setServerData(serverData);
        ServerRunningMonitors
            .setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {

                public ServerRunningMonitor apply(final String destination) {
                    ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
//设置intance的名字
                    runningMonitor.setDestination(destination);
                    runningMonitor.setListener(new ServerRunningListener() {

                        public void processActiveEnter() {
                            try {
                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
//启动intance
                                embededCanalServer.start(destination);
                                if (canalMQStarter != null) {
                                    canalMQStarter.startDestination(destination);
                                }
                            } finally {
                                MDC.remove(CanalConstants.MDC_DESTINATION);
                            }
                        }

                        public void processActiveExit() {
                            try {
                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                                if (canalMQStarter != null) {
                                    canalMQStarter.stopDestination(destination);
                                }
                                embededCanalServer.stop(destination);
                            } finally {
                                MDC.remove(CanalConstants.MDC_DESTINATION);
                            }
                        }

                        public void processStart() {
                            try {
                                if (zkclientx != null) {
                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
                                        ip + ":" + port);

                                    initCid(path);
                                    zkclientx.subscribeStateChanges(new IZkStateListener() {

                                        public void handleStateChanged(KeeperState state) throws Exception {

                                        }

                                        public void handleNewSession() throws Exception {
                                            initCid(path);
                                        }

                                        @Override
                                        public void handleSessionEstablishmentError(Throwable error) throws Exception {
                                            logger.error("failed to connect to zookeeper", error);
                                        }
                                    });
                                }
                            } finally {
                                MDC.remove(CanalConstants.MDC_DESTINATION);
                            }
                        }

                        public void processStop() {
                            try {
                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                                if (zkclientx != null) {
                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
                                        ip + ":" + port);
                                    releaseCid(path);
                                }
                            } finally {
                                MDC.remove(CanalConstants.MDC_DESTINATION);
                            }
                        }

                    });
                    if (zkclientx != null) {
                        runningMonitor.setZkClient(zkclientx);
                    }
                    // 触发创建一下cid节点
                    runningMonitor.init();
                    return runningMonitor;
                }
            }));

3.2 ServerRunningMonitor的启动

代码语言:javascript
复制
 public synchronized void start() {
        super.start();
        try {
//3.2.1 
            processStart();
            if (zkClient != null) {
                //3.2.2  如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
//  /otter/canal/destinations/{destination}/running 节点
                String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
                zkClient.subscribeDataChanges(path, dataListener);

                initRunning();
            } else {
                processActiveEnter();// 没有zk,直接启动
            }
        } catch (Exception e) {
            logger.error("start failed", e);
            // 没有正常启动,重置一下状态,避免干扰下一次start
            stop();
        }
    }

3.2.1 调用processStart方法,这里会在/otter/canal/destinations/{destination}/cluster 节点下注册IZkStateListener,用来监听和zk的连接状态变化,同时创建"ip:port"临时节点。这个临时节点主要是用来给canal-client提供可用canal-server节点列表使用。 3.2.2 在/otter/canal/destinations/{destination}/running 节点下注册dataListener,用来监听该节点的数据增删改变化。 3.3.3初始化instance下的canal-server信息。

代码语言:javascript
复制
private void initRunning() {
        if (!isStart()) {
            return;
        }

        String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
        // 序列化
        byte[] bytes = JsonUtils.marshalToByte(serverData);
        try {
            mutex.set(false);
            zkClient.create(path, bytes, CreateMode.EPHEMERAL);
            activeData = serverData;
            processActiveEnter();// 触发一下事件
            mutex.set(true);
        } catch (ZkNodeExistsException e) {
            bytes = zkClient.readData(path, true);
            if (bytes == null) {// 如果不存在节点,立即尝试一次
                initRunning();
            } else {
                activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
            }
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
            initRunning();
        }
    }

创建/otter/canal/destinations/{destination}/running 临时节点,不能创建成功会抛出ZkNodeExistsException异常,表明这个instance已经有其他canal-server负责binlog同步,此时会读取该临时节点数据,记录下来为其服务的canal-server节点数据到activeData中。

能创建成功表明这个instance由当前canal-server负责binlog同步,调用processActiveEnter方法启动这个instance。

所以/otter/canal/destinations/{destination}/running 临时节点 表示当前为该instance服务的canal-server节点是谁。如果canal-server与zk连接超时,会导致该临时节点被删除。此时每个canal-server注册在该节点上的dataListener便会监听到这一变化,做主备切换之类的操作。

代码语言:javascript
复制
//初始化ServerRunningMonitor时会初始化dataListener
 public ServerRunningMonitor(){
        // 创建父节点
        dataListener = new IZkDataListener() {

            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put("destination", destination);
                ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
                if (!isMine(runningData.getAddress())) {
                    mutex.set(false);
                }

                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
                    release = true;
                    releaseRunning();// 彻底释放mainstem
                }

                activeData = (ServerRunningData) runningData;
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put("destination", destination);
                mutex.set(false);
                if (!release && activeData != null && isMine(activeData.getAddress())) {
                    // 如果上一次active的状态就是本机,则即时触发一下active抢占
                    initRunning();
                } else {
                    // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
                    delayExector.schedule(new Runnable() {

                        public void run() {
                            initRunning();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }
            }

        };

    }

可以看到,监听了/otter/canal/destinations/{destination}/running 临时节点的节点删除handleDataDeleted和节点数据变化handleDataChange两个事件。

当节点被删除时,如果上一次active的状态就是本机,则调用initRunning即时触发一下active抢占。否则就是等待delayTime之后在抢占,避免因网络瞬端或者zk异常,导致出现频繁的切换操作。

当节点修改时,主要是记录下此时/otter/canal/destinations/{destination}/running 临时节点下激活的canal-server是谁到activeData中。如果出现了主动释放,则彻底释放instance。(删除zk上该临时节点,关闭instance,没有在代码中看到release=false的情况)。

总结一下:/otter/canal/cluster节点下的临时子节点代表当前有多少个正常运行的canal-server。/otter/canal/destinations/{destination}/cluster代表当前instance下有多少可用的canal server。/otter/canal/destinations/{destination}/running 临时节点下的数据代表当前instance的激活canal-server是谁,每个正常运行的canal-server都会在/otter/canal/destinations/{destination}/running 临时节点下注册dataListener,用于及时做HA切换。

Canal Client HA实现

canal中一个instance只能由一个 client消费,接下来看一下canal-client的HA是如何通过zk实现的。官方给出了client test类ClusterCanalClientTest,核心代码就是完成了官方文档中增量订阅和消费过程:

代码语言:javascript
复制
// 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
        CanalConnector connector = CanalConnectors.newClusterConnector("127.0.0.1:2181", destination, "", "");
protected void process() {
        int batchSize = 5 * 1024;
        while (running) {
            try {
                MDC.put("destination", destination);
//1.建立连接
                connector.connect();
//2.客户端订阅,不提交客户端filter,以服务端的filter为准
                connector.subscribe();
                while (running) {
//3. 不指定 position 获取事件,该方法返回的条件: 尝试拿batchSize条记录,有多少取多少,不会阻塞等待
//       canal 会记住此 client 最新的position。 
//        如果是第一次 fetch,则会从 canal 中保存的最老一条数据开始输出。

                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        // try {
                        // Thread.sleep(1000);
                        // } catch (InterruptedException e) {
                        // }
                    } else {
                        printSummary(message, batchId, size);
                        printEntry(message.getEntries());
                    }

                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
            } catch (Exception e) {
                logger.error("process error!", e);
            } finally {
                connector.disconnect();
                MDC.remove("destination");
            }
        }
    }

1.在canal client和canal server建立连接时,会从zookeeper上获取cluster和running节点下信息,并通过ClusterNodeAccessStrategy来注册listeners实时感知这些数据的变化,从而支持连接重试的failover。

代码语言:javascript
复制
public void connect() throws CanalClientException {
        while (currentConnector == null) {
            int times = 0;
            while (true) {
                try {
                    currentConnector = new SimpleCanalConnector(null, username, password, destination) {

                        @Override
                        public SocketAddress getNextAddress() {
                            return accessStrategy.nextNode();
                        }

                    };
                    currentConnector.setSoTimeout(soTimeout);
                    currentConnector.setIdleTimeout(idleTimeout);
                    if (filter != null) {
                        currentConnector.setFilter(filter);
                    }
                    if (accessStrategy instanceof ClusterNodeAccessStrategy) {
//1.1
                        currentConnector.setZkClientx(((ClusterNodeAccessStrategy) accessStrategy).getZkClient());
                    }

                    currentConnector.connect();
                    break;
                } catch (Exception e) {
                    logger.warn("failed to connect to:{} after retry {} times", accessStrategy.currentNode(), times);
                    currentConnector.disconnect();
                    currentConnector = null;
                    // retry for #retryTimes for each node when trying to
                    // connect to it.
                    times = times + 1;
                    if (times >= retryTimes) {
                        throw new CanalClientException(e);
                    } else {
                        // fixed issue #55,增加sleep控制,避免重试connect时cpu使用过高
                        try {
                            Thread.sleep(retryInterval);
                        } catch (InterruptedException e1) {
                            throw new CanalClientException(e1);
                        }
                    }
                }
            }
        }
    }

1.1 设置当前connector的zkClient,并在zk上初始化客户端信息。

代码语言:javascript
复制
public void setZkClientx(ZkClientx zkClientx) {
        this.zkClientx = zkClientx;
        initClientRunningMonitor(this.clientIdentity);
    }

 private synchronized void initClientRunningMonitor(ClientIdentity clientIdentity) {
        if (zkClientx != null && clientIdentity != null && runningMonitor == null) {
            ClientRunningData clientData = new ClientRunningData();
            clientData.setClientId(clientIdentity.getClientId());
            clientData.setAddress(AddressUtils.getHostIp());

            runningMonitor = new ClientRunningMonitor();
            runningMonitor.setDestination(clientIdentity.getDestination());
            runningMonitor.setZkClient(zkClientx);
            runningMonitor.setClientData(clientData);
            runningMonitor.setListener(new ClientRunningListener() {

                public InetSocketAddress processActiveEnter() {
                    InetSocketAddress address = doConnect();
                    mutex.set(true);
                    if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
                        subscribe(filter);
                    }

                    if (rollbackOnConnect) {
                        rollback();
                    }

                    return address;
                }

                public void processActiveExit() {
                    mutex.set(false);
                    doDisconnect();
                }

            });
        }
    }

ClientRunningMonitor 类似于ServerRunningMonitor,是客户端对instance的管理。在真正建立连接currentConnector.connect();时,会启动ClientRunningMonitor

代码语言:javascript
复制
 public void connect() throws CanalClientException {
        if (connected) {
            return;
        }

        if (runningMonitor != null) {
            if (!runningMonitor.isStart()) {
//启动ClientRunningMonitor
                runningMonitor.start();
            }
        } else {
            waitClientRunning();
            if (!running) {
                return;
            }
//从ClusterNodeAccessStrategy中选择当前instance正在工作的canal server进行连接
            doConnect();
            if (filter != null) { // 如果存在条件,说明是自动切换,基于上一次的条件订阅一次
                subscribe(filter);
            }
            if (rollbackOnConnect) {
                rollback();
            }
        }

        connected = true;
    }

可以看到与canal server建立连接前会启动ClientRunningMonitor,获取消费instance 的权利。

代码语言:javascript
复制
//ClientRunningMonitor.start
  public void start() {
        super.start();

        String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
        zkClient.subscribeDataChanges(path, dataListener);
        initRunning();
    }

与serverRunningMonitor 原理一样,在代表instance客户端的running节点/otter/canal/destinations/{destination}/{clientId}/running下注册dataListener,由dataListener监听节点数据变化负责客户端的HA切换。

代码语言:javascript
复制
public ClientRunningMonitor(){
        dataListener = new IZkDataListener() {

            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put("destination", destination);
                ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
                if (!isMine(runningData.getAddress())) {
                    mutex.set(false);
                }

                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
                    release = true;
                    releaseRunning();// 彻底释放mainstem
                }

                activeData = (ClientRunningData) runningData;
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put("destination", destination);
                mutex.set(false);
                // 触发一下退出,可能是人为干预的释放操作或者网络闪断引起的session expired timeout
                processActiveExit();
                if (!release && activeData != null && isMine(activeData.getAddress())) {
                    // 如果上一次active的状态就是本机,则即时触发一下active抢占
                    initRunning();
                } else {
                    // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
                    delayExector.schedule(new Runnable() {

                        public void run() {
                            initRunning();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }
            }

        };

    }

这个临时节点的创建过程在ClientRunningMonitor.initRunning中,创建临时节点成功才能与canal server建立连接。临时节点写入信息为客户端的IP,port和clientId信息。

代码语言:javascript
复制
// 1,在方法上加synchronized关键字,保证同步顺序执行;
    // 2,判断Zk上已经存在的activeData是否是本机,是的话把mutex重置为true,否则会导致死锁
    // 3,增加异常处理,保证出现异常时,running节点能被删除,否则会导致死锁
    public synchronized void initRunning() {
        if (!isStart()) {
            return;
        }

        String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
        // 序列化
        byte[] bytes = JsonUtils.marshalToByte(clientData);
        try {
            mutex.set(false);
            zkClient.create(path, bytes, CreateMode.EPHEMERAL);

            processActiveEnter();// 触发一下事件,建立和canal server的连接
            activeData = clientData;
            mutex.set(true);
        } catch (ZkNodeExistsException e) {
            bytes = zkClient.readData(path, true);
            if (bytes == null) {// 如果不存在节点,立即尝试一次
                initRunning();
            } else {
                activeData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
                // 如果发现已经存在,判断一下是否自己,避免活锁
                if (activeData.getAddress().contains(":") && isMine(activeData.getAddress())) {
                    mutex.set(true);
                }
            }
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientData.getClientId()),
                true); // 尝试创建父节点
            initRunning();
        } catch (Throwable t) {
            logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
                destination),
                t);

            // fixed issue 1220, 针对server节点不工作避免死循环
            if (t instanceof ServerNotFoundException) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }

            // 出现任何异常尝试release
            releaseRunning();
            throw new CanalClientException("something goes wrong in initRunning method. ", t);
        }
    }
  1. 客户端与服务端建立连接后,会发送SUBSCRIPTION 请求给服务端。subscribe主要是告诉canal server需要按照什么过滤条件来过滤库中的binlog信息,同时将当前clientIdentity告诉服务端。因为一个instance只能对应一个client,所以clientIdentity统一初始化为:
代码语言:javascript
复制
this.clientIdentity = new ClientIdentity(destination, (short) 1001);
public ClientIdentity(String destination, short clientId){
        this.clientId = clientId;
        this.destination = destination;
    }
代码语言:javascript
复制
public void subscribe(String filter) throws CanalClientException {
        waitClientRunning();
        if (!running) {
            return;
        }
        try {
            writeWithHeader(Packet.newBuilder()
                .setType(PacketType.SUBSCRIPTION)
                .setBody(Sub.newBuilder()
                    .setDestination(clientIdentity.getDestination())
                    .setClientId(String.valueOf(clientIdentity.getClientId()))
                    .setFilter(filter != null ? filter : "")
                    .build()
                    .toByteString())
                .build()
                .toByteArray());
            //
            Packet p = Packet.parseFrom(readNextPacket());
            Ack ack = Ack.parseFrom(p.getBody());
            if (ack.getErrorCode() > 0) {
                throw new CanalClientException("failed to subscribe with reason: " + ack.getErrorMessage());
            }

            clientIdentity.setFilter(filter);
        } catch (IOException e) {
            throw new CanalClientException(e);
        }
    }

当canal server接收到SUBSCRIPTION请求时,会将客户端信息clientIdentity注册到对应instance

代码语言:javascript
复制
//embeddedServer.subscribe(clientIdentity)
    /**
     * 客户端订阅,重复订阅时会更新对应的filter信息
     */
    @Override
    public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());

        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        if (!canalInstance.getMetaManager().isStart()) {
            canalInstance.getMetaManager().start();
        }
//1.
        canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅

        Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
        if (position == null) {
            position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
            if (position != null) {
//2.
                canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
            }
            logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
        } else {
            logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);
        }

        // 通知下订阅关系变化
        canalInstance.subscribeChange(clientIdentity);
    }

主要逻辑为:

  1. 执行一下meta订阅。metaManager是负责管理客户端消费位点等信息,对于HA模式下,客户端位点,filter等信息会放到zk上,方便canal server切换时的共用。
代码语言:javascript
复制
//ZooKeeperMetaManager.subscribe
 public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
            clientIdentity.getClientId());

        try {
            zkClientx.createPersistent(path, true);
        } catch (ZkNodeExistsException e) {
            // ignore
        }
//如果客户端存在filter,则创建/otter/canal/destinations/{destination}/{clientId}/filter持久节点,存放客户端的filter信息。
        if (clientIdentity.hasFilter()) {
            String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
                clientIdentity.getClientId());

            byte[] bytes = null;
            try {
                bytes = clientIdentity.getFilter().getBytes(ENCODE);
            } catch (UnsupportedEncodingException e) {
                throw new CanalMetaManagerException(e);
            }

            try {
                zkClientx.createPersistent(filterPath, bytes);
            } catch (ZkNodeExistsException e) {
                // ignore
                zkClientx.writeData(filterPath, bytes);
            }
        }
    }

如果客户端存在filter,则创建/otter/canal/destinations/{destination}/{clientId}/filter持久节点,存放客户端的filter信息。

2.如果canal server存在原来的位点信息,则通过后台定时任务将位点信息刷新到/otter/canal/destinations/{destination}/{clientId}/cursor持久节点中。

总结一下:canal client的HA模式同样是有临时节点和节点listener watch保证。当canal client与canal server建立连接前,会创建临时节点/otter/canal/destinations/{destination}/{clientId}/running,创建成功的client在该节点下写入自己的IP,port,clientId信息,表示当前该instance下激活的client是自己。同时每个canal client都会在节点上注册dataListener,监听节点数据变化负责客户端的HA切换。激活的client会通过ClusterNodeAccessStrategy获得zk上canal server的信息,得知当前instance下的激活canal server,并与之建立连接。之后客户端发送SUBSCRIPTION 请求给canal server,如果存在客户端位点,filter等信息,会注册到zk上,方便canal server切换时的共用。

至此canal的HA模式分析完毕,下篇文章将分析canal工作过程中的binlog位点是如何确定的。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • HA机制设计
  • Canal Server HA实现
  • Canal Client HA实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档