前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊canal的CanalAdapterService

聊聊canal的CanalAdapterService

原创
作者头像
code4it
修改2020-04-07 11:05:38
5970
修改2020-04-07 11:05:38
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下canal的CanalAdapterService

CanalAdapterService

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

代码语言:javascript
复制
@Component
@RefreshScope
public class CanalAdapterService {
​
    private static final Logger logger  = LoggerFactory.getLogger(CanalAdapterService.class);
​
    private CanalAdapterLoader  adapterLoader;
​
    @Resource
    private ContextRefresher    contextRefresher;
​
    @Resource
    private AdapterCanalConfig  adapterCanalConfig;
    @Resource
    private Environment         env;
​
    // 注入bean保证优先注册
    @Resource
    private SpringContext       springContext;
    @Resource
    private SyncSwitch          syncSwitch;
​
    private volatile boolean    running = false;
​
    @PostConstruct
    public synchronized void init() {
        if (running) {
            return;
        }
        try {
            logger.info("## start the canal client adapters.");
            adapterLoader = new CanalAdapterLoader(adapterCanalConfig);
            adapterLoader.init();
            running = true;
            logger.info("## the canal client adapters are running now ......");
        } catch (Exception e) {
            logger.error("## something goes wrong when starting up the canal client adapters:", e);
        }
    }
​
    @PreDestroy
    public synchronized void destroy() {
        if (!running) {
            return;
        }
        try {
            running = false;
            logger.info("## stop the canal client adapters");
​
            if (adapterLoader != null) {
                adapterLoader.destroy();
                adapterLoader = null;
            }
            for (DruidDataSource druidDataSource : DatasourceConfig.DATA_SOURCES.values()) {
                try {
                    druidDataSource.close();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
            DatasourceConfig.DATA_SOURCES.clear();
        } catch (Throwable e) {
            logger.warn("## something goes wrong when stopping canal client adapters:", e);
        } finally {
            logger.info("## canal client adapters are down.");
        }
    }
}
  • CanalAdapterService提供了init及destroy方法;其init方法会使用adapterCanalConfig创建CanalAdapterLoader,然后执行adapterLoader.init();其destroy方法会执行adapterLoader.destroy(),然后遍历DatasourceConfig.DATA_SOURCES执行druidDataSource.close(),最后清空DatasourceConfig.DATA_SOURCES

CanalAdapterLoader

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

代码语言:javascript
复制
public class CanalAdapterLoader {
​
    private static final Logger                     logger        = LoggerFactory.getLogger(CanalAdapterLoader.class);
​
    private CanalClientConfig                       canalClientConfig;
​
    private Map<String, CanalAdapterWorker>         canalWorkers  = new HashMap<>();
​
    private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();
​
    private ExtensionLoader<OuterAdapter>           loader;
​
    public CanalAdapterLoader(CanalClientConfig canalClientConfig){
        this.canalClientConfig = canalClientConfig;
    }
​
    /**
     * 初始化canal-client
     */
    public void init() {
        loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class);
​
        String canalServerHost = this.canalClientConfig.getCanalServerHost();
        SocketAddress sa = null;
        if (canalServerHost != null) {
            String[] ipPort = canalServerHost.split(":");
            sa = new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
        }
        String zkHosts = this.canalClientConfig.getZookeeperHosts();
​
        if ("tcp".equalsIgnoreCase(canalClientConfig.getMode())) {
            // 初始化canal-client的适配器
            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
​
                for (CanalClientConfig.Group connectorGroup : canalAdapter.getGroups()) {
                    List<OuterAdapter> canalOutConnectors = new ArrayList<>();
                    for (OuterAdapterConfig c : connectorGroup.getOuterAdapters()) {
                        loadAdapter(c, canalOutConnectors);
                    }
                    canalOuterAdapterGroups.add(canalOutConnectors);
                }
                CanalAdapterWorker worker;
                if (sa != null) {
                    worker = new CanalAdapterWorker(canalClientConfig,
                        canalAdapter.getInstance(),
                        sa,
                        canalOuterAdapterGroups);
                } else if (zkHosts != null) {
                    worker = new CanalAdapterWorker(canalClientConfig,
                        canalAdapter.getInstance(),
                        zkHosts,
                        canalOuterAdapterGroups);
                } else {
                    throw new RuntimeException("No canal server connector found");
                }
                canalWorkers.put(canalAdapter.getInstance(), worker);
                worker.start();
                logger.info("Start adapter for canal instance: {} succeed", canalAdapter.getInstance());
            }
        } else if ("kafka".equalsIgnoreCase(canalClientConfig.getMode())) {
            // 初始化canal-client-kafka的适配器
            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
                    for (OuterAdapterConfig config : group.getOuterAdapters()) {
                        loadAdapter(config, canalOuterAdapters);
                    }
                    canalOuterAdapterGroups.add(canalOuterAdapters);
​
                    CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,
                        canalClientConfig.getMqServers(),
                        canalAdapter.getInstance(),
                        group.getGroupId(),
                        canalOuterAdapterGroups,
                        canalClientConfig.getFlatMessage());
                    canalMQWorker.put(canalAdapter.getInstance() + "-kafka-" + group.getGroupId(), canalKafkaWorker);
                    canalKafkaWorker.start();
                    logger.info("Start adapter for canal-client mq topic: {} succeed",
                        canalAdapter.getInstance() + "-" + group.getGroupId());
                }
            }
        } else if ("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode())) {
            // 初始化canal-client-rocketMQ的适配器
            for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
                    for (OuterAdapterConfig config : group.getOuterAdapters()) {
                        loadAdapter(config, canalOuterAdapters);
                    }
                    canalOuterAdapterGroups.add(canalOuterAdapters);
                    CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,
                        canalClientConfig.getMqServers(),
                        canalAdapter.getInstance(),
                        group.getGroupId(),
                        canalOuterAdapterGroups,
                        canalClientConfig.getAccessKey(),
                        canalClientConfig.getSecretKey(),
                        canalClientConfig.getFlatMessage(),
                        canalClientConfig.isEnableMessageTrace(),
                        canalClientConfig.getCustomizedTraceTopic(),
                        canalClientConfig.getAccessChannel(),
                        canalClientConfig.getNamespace());
                    canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                    rocketMQWorker.start();
​
                    logger.info("Start adapter for canal-client mq topic: {} succeed",
                        canalAdapter.getInstance() + "-" + group.getGroupId());
                }
            }
        }
    }
​
    private void loadAdapter(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {
        try {
            OuterAdapter adapter;
            adapter = loader.getExtension(config.getName(), StringUtils.trimToEmpty(config.getKey()));
​
            ClassLoader cl = Thread.currentThread().getContextClassLoader();
            // 替换ClassLoader
            Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());
            Environment env = (Environment) SpringContext.getBean(Environment.class);
            Properties evnProperties = null;
            if (env instanceof StandardEnvironment) {
                evnProperties = new Properties();
                for (PropertySource<?> propertySource : ((StandardEnvironment) env).getPropertySources()) {
                    if (propertySource instanceof EnumerablePropertySource) {
                        String[] names = ((EnumerablePropertySource<?>) propertySource).getPropertyNames();
                        for (String name : names) {
                            Object val = propertySource.getProperty(name);
                            if (val != null) {
                                evnProperties.put(name, val);
                            }
                        }
                    }
                }
            }
            adapter.init(config, evnProperties);
            Thread.currentThread().setContextClassLoader(cl);
            canalOutConnectors.add(adapter);
            logger.info("Load canal adapter: {} succeed", config.getName());
        } catch (Exception e) {
            logger.error("Load canal adapter: {} failed", config.getName(), e);
        }
    }
​
    /**
     * 销毁所有适配器 为防止canal实例太多造成销毁阻塞, 并行销毁
     */
    public void destroy() {
        if (!canalWorkers.isEmpty()) {
            ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size());
            for (CanalAdapterWorker canalAdapterWorker : canalWorkers.values()) {
                stopExecutorService.execute(canalAdapterWorker::stop);
            }
            stopExecutorService.shutdown();
            try {
                while (!stopExecutorService.awaitTermination(1, TimeUnit.SECONDS)) {
                    // ignore
                }
            } catch (InterruptedException e) {
                // ignore
            }
        }
​
        if (!canalMQWorker.isEmpty()) {
            ExecutorService stopMQWorkerService = Executors.newFixedThreadPool(canalMQWorker.size());
            for (AbstractCanalAdapterWorker canalAdapterMQWorker : canalMQWorker.values()) {
                stopMQWorkerService.execute(canalAdapterMQWorker::stop);
            }
            stopMQWorkerService.shutdown();
            try {
                while (!stopMQWorkerService.awaitTermination(1, TimeUnit.SECONDS)) {
                    // ignore
                }
            } catch (InterruptedException e) {
                // ignore
            }
        }
        logger.info("All canal adapters destroyed");
    }
}
  • CanalAdapterLoader的init方法通过ExtensionLoader.getExtensionLoader(OuterAdapter.class)获取OuterAdapter的loader,之后根据不同的canalClientConfig.getMode()进行不同的初始化,这里有tcp、kafka、rocketMQ三种模式;tcp模式创建CanalAdapterWorker,然后添加到canalWorkers之后执行worker.start方法;kafka模式创建CanalAdapterKafkaWorker,然后添加到canalMQWorker之后执行worker.start方法;rocketMQ创建CanalAdapterRocketMQWorker,然后添加到canalMQWorker之后执行worker.start方法;其destroy方法会遍历canalWorkers.values()及canalMQWorker.values(),提交异步线程执行其stop方法

小结

CanalAdapterService提供了init及destroy方法;其init方法会使用adapterCanalConfig创建CanalAdapterLoader,然后执行adapterLoader.init();其destroy方法会执行adapterLoader.destroy(),然后遍历DatasourceConfig.DATA_SOURCES执行druidDataSource.close(),最后清空DatasourceConfig.DATA_SOURCES

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CanalAdapterService
  • CanalAdapterLoader
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档