前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊debezium的BlockingReader

聊聊debezium的BlockingReader

原创
作者头像
code4it
修改2020-05-18 11:02:20
3850
修改2020-05-18 11:02:20
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下debezium的BlockingReader

Reader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java

public interface Reader {
​
    public static enum State {
        /**
         * The reader is stopped and static.
         */
        STOPPED,
​
        /**
         * The reader is running and generated records.
         */
        RUNNING,
​
        /**
         * The reader has completed its work or been explicitly stopped, but not all of the generated records have been
         * consumed via {@link Reader#poll() polling}.
         */
        STOPPING;
    }
​
    public String name();
​
    public State state();
​
    public void uponCompletion(Runnable handler);
​
    public default void initialize() {
        // do nothing
    }
​
    public default void destroy() {
        // do nothing
    }
​
    public void start();
​
    public void stop();
​
    public List<SourceRecord> poll() throws InterruptedException;
}
  • Reader接口定义了name、state、uponCompletion、start、stop、poll方法

BlockingReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java

public class BlockingReader implements Reader {
​
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
    private final AtomicReference<State> state = new AtomicReference<>();
    private final Metronome metronome;
​
    private final String name;
    private final String runningLogMessage;
​
    public BlockingReader(String name, String runningLogMessage) {
        this.name = name;
        this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
        this.runningLogMessage = runningLogMessage;
​
    }
​
    /**
     * Does nothing until the connector task is shut down, but regularly returns control back to Connect in order for being paused if requested.
     */
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        if (state.get() == State.STOPPED) {
            return null;
        }
​
        metronome.pause();
        state.compareAndSet(State.RUNNING, State.STOPPING);
​
        return null;
    }
​
    @Override
    public State state() {
        return state.get();
    }
​
    @Override
    public void uponCompletion(Runnable handler) {
        assert this.uponCompletion.get() == null;
        this.uponCompletion.set(handler);
    }
​
    @Override
    public void start() {
        state.set(State.RUNNING);
        logger.info(runningLogMessage);
    }
​
    @Override
    public void stop() {
        try {
            state.set(State.STOPPED);
​
            // Cleanup Resources
            Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
            if (completionHandler != null) {
                completionHandler.run();
            }
​
        }
        finally {
            logger.info("Blocking Reader has completed.");
        }
    }
​
    @Override
    public String name() {
        return name;
    }
​
}
  • BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null

TimedBlockingReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TimedBlockingReader.java

public class TimedBlockingReader extends BlockingReader {
​
    protected final Logger logger = LoggerFactory.getLogger(getClass());
​
    private final Duration timeout;
    private volatile Timer timer;
​
    /**
     * @param name Name of the reader
     * @param timeout Duration of time until this TimedBlockingReader should stop
     */
    public TimedBlockingReader(String name, Duration timeout) {
        super(name, "The connector will wait for " + timeout.toMillis() + " ms before proceeding");
        this.timeout = timeout;
    }
​
    @Override
    public void start() {
        super.start();
        this.timer = Threads.timer(Clock.SYSTEM, timeout);
    }
​
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        super.poll();
​
        // Stop when we've reached the timeout threshold
        if (timer != null && timer.expired()) {
            stop();
        }
​
        return null;
    }
}
  • TimedBlockingReader继承了BlockingReader,其start方法通过Threads.timer(Clock.SYSTEM, timeout)创建了Timer;其poll方法先执行父类的poll方法,然后在timer.expired()为true时执行stop(),最后返回null

小结

BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Reader
  • BlockingReader
  • TimedBlockingReader
  • 小结
  • doc
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档