专栏首页码匠的流水账聊聊debezium的OffsetCommitPolicy
原创

聊聊debezium的OffsetCommitPolicy

本文主要研究一下debezium的OffsetCommitPolicy

OffsetCommitPolicy

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java

@Incubating
@FunctionalInterface
public interface OffsetCommitPolicy {
​
    boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit);
​
    static OffsetCommitPolicy always() {
        return new AlwaysCommitOffsetPolicy();
    }
​
    static OffsetCommitPolicy periodic(Properties config) {
        return new PeriodicCommitOffsetPolicy(config);
    }
​
}
  • OffsetCommitPolicy定义了performCommit方法,并提供了always静态方法用于创建AlwaysCommitOffsetPolicy;提供了periodic静态方法用于创建PeriodicCommitOffsetPolicy

AlwaysCommitOffsetPolicy

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java

    public static class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy {
​
        @Override
        public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {
            return true;
        }
    }
  • AlwaysCommitOffsetPolicy实现了OffsetCommitPolicy接口,其performCommit返回true

PeriodicCommitOffsetPolicy

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java

    public static class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {
​
        private final Duration minimumTime;
​
        public PeriodicCommitOffsetPolicy(Properties config) {
            minimumTime = Duration.ofMillis(Long.valueOf(config.getProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP)));
        }
​
        @Override
        public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {
            return timeSinceLastCommit.compareTo(minimumTime) >= 0;
        }
    }
  • PeriodicCommitOffsetPolicy实现了OffsetCommitPolicy接口,其performCommit通过timeSinceLastCommit.compareTo(minimumTime)进行判断,大于等于0返回true

RecordCommitter

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java

    public static interface RecordCommitter<R> {
​
        /**
         * Marks a single record as processed, must be called for each
         * record.
         *
         * @param record the record to commit
         */
        void markProcessed(R record) throws InterruptedException;
​
        /**
         * Marks a batch as finished, this may result in committing offsets/flushing
         * data.
         * <p>
         * Should be called when a batch of records is finished being processed.
         */
        void markBatchFinished();
    }
  • RecordCommitter接口定义了markProcessed、markBatchFinished方法

EmbeddedEngine

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java

@ThreadSafe
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
​
    //......
​
    protected RecordCommitter buildRecordCommitter(OffsetStorageWriter offsetWriter, SourceTask task, Duration commitTimeout) {
        return new RecordCommitter() {
​
            @Override
            public synchronized void markProcessed(SourceRecord record) throws InterruptedException {
                task.commitRecord(record);
                recordsSinceLastCommit += 1;
                offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
            }
​
            @Override
            public synchronized void markBatchFinished() {
                maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeout, task);
            }
        };
    }
​
    protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, Duration commitTimeout,
                              SourceTask task) {
        // Determine if we need to commit to offset storage ...
        long timeSinceLastCommitMillis = clock.currentTimeInMillis() - timeOfLastCommitMillis;
        if (policy.performCommit(recordsSinceLastCommit, Duration.ofMillis(timeSinceLastCommitMillis))) {
            commitOffsets(offsetWriter, commitTimeout, task);
        }
    }
​
    protected void commitOffsets(OffsetStorageWriter offsetWriter, Duration commitTimeout, SourceTask task) {
        long started = clock.currentTimeInMillis();
        long timeout = started + commitTimeout.toMillis();
        if (!offsetWriter.beginFlush()) {
            return;
        }
        Future<Void> flush = offsetWriter.doFlush(this::completedFlush);
        if (flush == null) {
            return; // no offsets to commit ...
        }
​
        // Wait until the offsets are flushed ...
        try {
            flush.get(Math.max(timeout - clock.currentTimeInMillis(), 0), TimeUnit.MILLISECONDS);
            // if we've gotten this far, the offsets have been committed so notify the task
            task.commit();
            recordsSinceLastCommit = 0;
            timeOfLastCommitMillis = clock.currentTimeInMillis();
        }
        catch (InterruptedException e) {
            logger.warn("Flush of {} offsets interrupted, cancelling", this);
            offsetWriter.cancelFlush();
        }
        catch (ExecutionException e) {
            logger.error("Flush of {} offsets threw an unexpected exception: ", this, e);
            offsetWriter.cancelFlush();
        }
        catch (TimeoutException e) {
            logger.error("Timed out waiting to flush {} offsets to storage", this);
            offsetWriter.cancelFlush();
        }
    }
​
    //......
​
}    
  • EmbeddedEngine的buildRecordCommitter方法创建了一个匿名RecordCommitter实现,其markBatchFinished方法会执行maybeFlush方法,该方法会通过policy.performCommit方法来判断是否执行commitOffsets;commitOffsets方法主要执行offsetWriter.doFlush

小结

OffsetCommitPolicy定义了performCommit方法,并提供了always静态方法用于创建AlwaysCommitOffsetPolicy;提供了periodic静态方法用于创建PeriodicCommitOffsetPolicy

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊debezium的OffsetCommitPolicy

    debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCo...

    codecraft
  • 聊聊tomcat jdbc pool的默认参数及poolSweeper

    本文主要研究一下tomcat jdbc pool的默认参数及poolSweeper

    codecraft
  • java10下编译lombok注解的代码

    本文主要研究下在带有lombok(1.16.20版本)注解的代码在java10下的编译问题。

    codecraft
  • 聊聊debezium的OffsetCommitPolicy

    debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCo...

    codecraft
  • PyQt5 图形界面-基础准备:安装QtCreater工具,使用QtDesigner设计界面,安装PyQt5工具库

    PyQt5 就是一个把 QtDesigner 设计的界面转化为 python 代码的 python 库。 QtDesigner 就是 QtCreater 里的...

    小蓝枣
  • 你想去华为吗?

    菊厂作为一个拥有17万员工的大公司,由于其企业文化以及对得起工作量的薪水,吸引了广大应届生的目光。最近又由于女神华姐姐的官方广告引起了大量的关注,可以预料到华为...

    谭庆波
  • 【征信】美国P2P平台upstart不用FICO评分是怎么运作的?

    国际范围来看,年轻的消费者都是信贷需求比较强烈的人群,但是因为传统银行要求借款者数年的信用和从业记录,这一群体的信贷服务并不是很通畅。如果一个人很年轻并且没有很...

    小莹莹
  • Python让你自己做一个软件,自己开个聊天室,厉害吧!

    内容 知识点 asyncore 、asynchat模块运用 环境 python 3.5 功能描绘 在本实验中,我们将实现一个简略的图形界面谈天系统。我们可以经...

    企鹅号小编
  • 数据揭秘:春运车票有多难抢?

    传说中的春运抢票,是一项对手眼协调速度、徒手计算机操作速度、奇特图形辨识速度等个人身体能力,对计算机配置、网速、网络稳定性等硬件基础设施条件,对地区经济发展、交...

    华章科技
  • 工作预测:从深度神经网络模型到应用程序

    摘要:根据学生的工作描述(例如,困难的知识和技能)以及雇主必须如何找到选择与其所需工作相匹配的候选人的方式,确定工作适合于学生或寻找工作的人。 在本文中,我们专...

    RockNPeng

扫码关注云+社区

领取腾讯云代金券