前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊CanalInstance

聊聊CanalInstance

作者头像
code4it
发布2020-04-16 11:07:38
3410
发布2020-04-16 11:07:38
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下CanalInstance

CanalLifeCycle

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/CanalLifeCycle.java

代码语言:javascript
复制
public interface CanalLifeCycle {

    void start();

    void stop();

    boolean isStart();
}
  • CanalLifeCycle接口定义了start、stop、isStart方法

AbstractCanalLifeCycle

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/AbstractCanalLifeCycle.java

代码语言:javascript
复制
public abstract class AbstractCanalLifeCycle implements CanalLifeCycle {

    protected volatile boolean running = false; // 是否处于运行中

    public boolean isStart() {
        return running;
    }

    public void start() {
        if (running) {
            throw new CanalException(this.getClass().getName() + " has startup , don't repeat start");
        }

        running = true;
    }

    public void stop() {
        if (!running) {
            throw new CanalException(this.getClass().getName() + " isn't start , please check");
        }

        running = false;
    }

}
  • AbstractCanalLifeCycle实现了CanalLifeCycle接口,其定义了running属性,start方法设置running为true,stop设置running为false,isStart返回running值

CanalInstance

canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstance.java

代码语言:javascript
复制
public interface CanalInstance extends CanalLifeCycle {

    String getDestination();

    CanalEventParser getEventParser();

    CanalEventSink getEventSink();

    CanalEventStore getEventStore();

    CanalMetaManager getMetaManager();

    CanalAlarmHandler getAlarmHandler();

    /**
     * 客户端发生订阅/取消订阅行为
     */
    boolean subscribeChange(ClientIdentity identity);

    CanalMQConfig getMqConfig();
}
  • CanalInstance继承了CanalLifeCycle,它还定义了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法

AbstractCanalInstance

canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java

代码语言:javascript
复制
public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance {

    private static final Logger                      logger = LoggerFactory.getLogger(AbstractCanalInstance.class);

    protected Long                                   canalId;                                                      // 和manager交互唯一标示
    protected String                                 destination;                                                  // 队列名字
    protected CanalEventStore<Event>                 eventStore;                                                   // 有序队列

    protected CanalEventParser                       eventParser;                                                  // 解析对应的数据信息
    protected CanalEventSink<List<CanalEntry.Entry>> eventSink;                                                    // 链接parse和store的桥接器
    protected CanalMetaManager                       metaManager;                                                  // 消费信息管理器
    protected CanalAlarmHandler                      alarmHandler;                                                 // alarm报警机制
    protected CanalMQConfig                          mqConfig;                                                     // mq的配置

    //......

    @Override
    public void start() {
        super.start();
        if (!metaManager.isStart()) {
            metaManager.start();
        }

        if (!alarmHandler.isStart()) {
            alarmHandler.start();
        }

        if (!eventStore.isStart()) {
            eventStore.start();
        }

        if (!eventSink.isStart()) {
            eventSink.start();
        }

        if (!eventParser.isStart()) {
            beforeStartEventParser(eventParser);
            eventParser.start();
            afterStartEventParser(eventParser);
        }
        logger.info("start successful....");
    }

    @Override
    public void stop() {
        super.stop();
        logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });

        if (eventParser.isStart()) {
            beforeStopEventParser(eventParser);
            eventParser.stop();
            afterStopEventParser(eventParser);
        }

        if (eventSink.isStart()) {
            eventSink.stop();
        }

        if (eventStore.isStart()) {
            eventStore.stop();
        }

        if (metaManager.isStart()) {
            metaManager.stop();
        }

        if (alarmHandler.isStart()) {
            alarmHandler.stop();
        }

        logger.info("stop successful....");
    }

    @Override
    public boolean subscribeChange(ClientIdentity identity) {
        if (StringUtils.isNotEmpty(identity.getFilter())) {
            logger.info("subscribe filter change to " + identity.getFilter());
            AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());

            boolean isGroup = (eventParser instanceof GroupEventParser);
            if (isGroup) {
                // 处理group的模式
                List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
                for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
                    if(singleEventParser instanceof AbstractEventParser) {
                        ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
                    }
                }
            } else {
                if(eventParser instanceof AbstractEventParser) {
                    ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
                }
            }

        }

        // filter的处理规则
        // a. parser处理数据过滤处理
        // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
        // 后续内存版的一对多分发,可以考虑
        return true;
    }


    @Override
    public String getDestination() {
        return destination;
    }

    @Override
    public CanalEventParser getEventParser() {
        return eventParser;
    }

    @Override
    public CanalEventSink getEventSink() {
        return eventSink;
    }

    @Override
    public CanalEventStore getEventStore() {
        return eventStore;
    }

    @Override
    public CanalMetaManager getMetaManager() {
        return metaManager;
    }

    @Override
    public CanalAlarmHandler getAlarmHandler() {
        return alarmHandler;
    }

    @Override
    public CanalMQConfig getMqConfig() {
        return mqConfig;
    }

    //......

}
  • AbstractCanalInstance继承了AbstractCanalLifeCycle,它覆盖了start、stop方法,其start方法分别启动metaManager、alarmHandler、eventStore、eventSink、eventParser,其stop方法分别关闭eventParser、eventSink、eventStore、metaManager、alarmHandler;其subscribeChange方法根据ClientIdentity的pattern创建AviaterRegexFilter,然后设置给eventParser

CanalInstanceWithSpring

canal-1.1.4/instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/CanalInstanceWithSpring.java

代码语言:javascript
复制
public class CanalInstanceWithSpring extends AbstractCanalInstance {

    private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithSpring.class);

    public void start() {
        logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination });
        super.start();
    }

    // ======== setter ========

    public void setDestination(String destination) {
        this.destination = destination;
    }

    public void setEventParser(CanalEventParser eventParser) {
        this.eventParser = eventParser;
    }

    public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> eventSink) {
        this.eventSink = eventSink;
    }

    public void setEventStore(CanalEventStore<Event> eventStore) {
        this.eventStore = eventStore;
    }

    public void setMetaManager(CanalMetaManager metaManager) {
        this.metaManager = metaManager;
    }

    public void setAlarmHandler(CanalAlarmHandler alarmHandler) {
        this.alarmHandler = alarmHandler;
    }

    public void setMqConfig(CanalMQConfig mqConfig){
        this.mqConfig = mqConfig;
    }

}
  • CanalInstanceWithSpring继承了AbstractCanalInstance,它专门给注册到spring容器使用

小结

CanalInstance继承了CanalLifeCycle,它还定义了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法

doc

  • CanalInstance
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CanalLifeCycle
  • AbstractCanalLifeCycle
  • CanalInstance
  • AbstractCanalInstance
  • CanalInstanceWithSpring
  • 小结
  • doc
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档