前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊CanalMQStarter

聊聊CanalMQStarter

原创
作者头像
code4it
修改2020-04-20 11:43:42
2970
修改2020-04-20 11:43:42
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下CanalMQStarter

CanalMQStarter

canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

代码语言:javascript
复制
public class CanalMQStarter {
​
    private static final Logger          logger         = LoggerFactory.getLogger(CanalMQStarter.class);
​
    private volatile boolean             running        = false;
​
    private ExecutorService              executorService;
​
    private CanalMQProducer              canalMQProducer;
​
    private MQProperties                 properties;
​
    private CanalServerWithEmbedded      canalServer;
​
    private Map<String, CanalMQRunnable> canalMQWorks   = new ConcurrentHashMap<>();
​
    private static Thread                shutdownThread = null;
​
    public CanalMQStarter(CanalMQProducer canalMQProducer){
        this.canalMQProducer = canalMQProducer;
    }
​
    public synchronized void start(MQProperties properties, String destinations) {
        try {
            if (running) {
                return;
            }
            this.properties = properties;
            canalMQProducer.init(properties);
            // set filterTransactionEntry
            if (properties.isFilterTransactionEntry()) {
                System.setProperty("canal.instance.filter.transaction.entry", "true");
            }
​
            canalServer = CanalServerWithEmbedded.instance();
​
            // 对应每个instance启动一个worker线程
            executorService = Executors.newCachedThreadPool();
            logger.info("## start the MQ workers.");
​
            String[] dsts = StringUtils.split(destinations, ",");
            for (String destination : dsts) {
                destination = destination.trim();
                CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
                canalMQWorks.put(destination, canalMQRunnable);
                executorService.execute(canalMQRunnable);
            }
​
            running = true;
            logger.info("## the MQ workers is running now ......");
​
            shutdownThread = new Thread() {
​
                public void run() {
                    try {
                        logger.info("## stop the MQ workers");
                        running = false;
                        executorService.shutdown();
                        canalMQProducer.stop();
                    } catch (Throwable e) {
                        logger.warn("##something goes wrong when stopping MQ workers:", e);
                    } finally {
                        logger.info("## canal MQ is down.");
                    }
                }
​
            };
​
            Runtime.getRuntime().addShutdownHook(shutdownThread);
        } catch (Throwable e) {
            logger.error("## Something goes wrong when starting up the canal MQ workers:", e);
        }
    }
​
    public synchronized void destroy() {
        running = false;
        if (executorService != null) {
            executorService.shutdown();
        }
        if (canalMQProducer != null) {
            canalMQProducer.stop();
        }
        if (shutdownThread != null) {
            Runtime.getRuntime().removeShutdownHook(shutdownThread);
            shutdownThread = null;
        }
    }
​
    //......
}
  • CanalMQStarter提供了start、destroy方法;其start方法使用MQProperties来初始化canalMQProducer,然后通过CanalServerWithEmbedded.instance()获取canalServer,之后遍历destinations,创建canalMQRunnable提交给executorService执行,最后注册了shutdownThread,在jvm关闭时执行executorService.shutdown()及canalMQProducer.stop();其destroy方法也是执行executorService.shutdown()及canalMQProducer.stop(),它还会从Runtime.getRuntime()的shutdownHook移除shutdownThread

CanalMQRunnable

canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

代码语言:javascript
复制
    private class CanalMQRunnable implements Runnable {
​
        private String destination;
​
        CanalMQRunnable(String destination){
            this.destination = destination;
        }
​
        private AtomicBoolean running = new AtomicBoolean(true);
​
        @Override
        public void run() {
            worker(destination, running);
        }
​
        public void stop() {
            running.set(false);
        }
    }
  • CanalMQRunnable实现了Runnable接口,其run方法执行worker(destination, running)

worker

canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

代码语言:javascript
复制
public class CanalMQStarter {
​
    //......
​
    private void worker(String destination, AtomicBoolean destinationRunning) {
        while (!running || !destinationRunning.get()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // ignore
            }
        }
​
        logger.info("## start the MQ producer: {}.", destination);
        MDC.put("destination", destination);
        final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
        while (running && destinationRunning.get()) {
            try {
                CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
                if (canalInstance == null) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                    continue;
                }
                MQProperties.CanalDestination canalDestination = new MQProperties.CanalDestination();
                canalDestination.setCanalDestination(destination);
                CanalMQConfig mqConfig = canalInstance.getMqConfig();
                canalDestination.setTopic(mqConfig.getTopic());
                canalDestination.setPartition(mqConfig.getPartition());
                canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
                canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                canalDestination.setPartitionHash(mqConfig.getPartitionHash());
​
                canalServer.subscribe(clientIdentity);
                logger.info("## the MQ producer: {} is running now ......", destination);
​
                Long getTimeout = properties.getCanalGetTimeout();
                int getBatchSize = properties.getCanalBatchSize();
                while (running && destinationRunning.get()) {
                    Message message;
                    if (getTimeout != null && getTimeout > 0) {
                        message = canalServer.getWithoutAck(clientIdentity,
                            getBatchSize,
                            getTimeout,
                            TimeUnit.MILLISECONDS);
                    } else {
                        message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
                    }
​
                    final long batchId = message.getId();
                    try {
                        int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
                        if (batchId != -1 && size != 0) {
                            canalMQProducer.send(canalDestination, message, new CanalMQProducer.Callback() {
​
                                @Override
                                public void commit() {
                                    canalServer.ack(clientIdentity, batchId); // 提交确认
                                }
​
                                @Override
                                public void rollback() {
                                    canalServer.rollback(clientIdentity, batchId);
                                }
                            }); // 发送message到topic
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                // ignore
                            }
                        }
​
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            } catch (Exception e) {
                logger.error("process error!", e);
            }
        }
    }
​
    //......
}
  • worker方法创建ClientIdentity,然后根据destination从canalServer.getCanalInstances()获取canalInstance,然后创建canalDestination,之后执行canalServer.subscribe(clientIdentity);然后while循环执行canalServer.getWithoutAck拉取message,通过canalMQProducer.send进行发送

小结

CanalMQStarter提供了start、destroy方法;其start方法使用MQProperties来初始化canalMQProducer,然后通过CanalServerWithEmbedded.instance()获取canalServer,之后遍历destinations,创建canalMQRunnable提交给executorService执行,最后注册了shutdownThread,在jvm关闭时执行executorService.shutdown()及canalMQProducer.stop();其destroy方法也是执行executorService.shutdown()及canalMQProducer.stop(),它还会从Runtime.getRuntime()的shutdownHook移除shutdownThread

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CanalMQStarter
  • CanalMQRunnable
  • worker
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档