专栏首页码匠的流水账聊聊debezium的SimpleSourceConnector

聊聊debezium的SimpleSourceConnector

本文主要研究一下debezium的SimpleSourceConnector

SimpleSourceConnector

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java

public class SimpleSourceConnector extends SourceConnector {

    protected static final String VERSION = "1.0";

    public static final String TOPIC_NAME = "topic.name";
    public static final String RECORD_COUNT_PER_BATCH = "record.count.per.batch";
    public static final String BATCH_COUNT = "batch.count";
    public static final String DEFAULT_TOPIC_NAME = "simple.topic";
    public static final String INCLUDE_TIMESTAMP = "include.timestamp";
    public static final String RETRIABLE_ERROR_ON = "error.retriable.on";
    public static final int DEFAULT_RECORD_COUNT_PER_BATCH = 1;
    public static final int DEFAULT_BATCH_COUNT = 10;
    public static final boolean DEFAULT_INCLUDE_TIMESTAMP = false;

    private Map<String, String> config;

    public SimpleSourceConnector() {
    }

    @Override
    public String version() {
        return VERSION;
    }

    @Override
    public void start(Map<String, String> props) {
        config = props;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return SimpleSourceConnector.SimpleConnectorTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> configs = new ArrayList<>();
        configs.add(config);
        return configs;
    }

    @Override
    public void stop() {
        // do nothing
    }

    @Override
    public ConfigDef config() {
        return null;
    }

    //......

}
  • SimpleSourceConnector继承了kafka的SourceConnector,其taskClass返回的是SimpleSourceConnector.SimpleConnectorTask.class

SimpleConnectorTask

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java

    public static class SimpleConnectorTask extends SourceTask {

        private int recordsPerBatch;
        private int errorOnRecord;
        private Queue<SourceRecord> records;
        private final AtomicBoolean running = new AtomicBoolean();
        private List<SourceRecord> retryRecords = null;

        @Override
        public String version() {
            return VERSION;
        }

        @Override
        public void start(Map<String, String> props) {
            if (running.compareAndSet(false, true)) {
                Configuration config = Configuration.from(props);
                recordsPerBatch = config.getInteger(RECORD_COUNT_PER_BATCH, DEFAULT_RECORD_COUNT_PER_BATCH);
                int batchCount = config.getInteger(BATCH_COUNT, DEFAULT_BATCH_COUNT);
                String topic = config.getString(TOPIC_NAME, DEFAULT_TOPIC_NAME);
                boolean includeTimestamp = config.getBoolean(INCLUDE_TIMESTAMP, DEFAULT_INCLUDE_TIMESTAMP);
                errorOnRecord = config.getInteger(RETRIABLE_ERROR_ON, -1);

                // Create the partition and schemas ...
                Map<String, ?> partition = Collect.hashMapOf("source", "simple");
                Schema keySchema = SchemaBuilder.struct()
                        .name("simple.key")
                        .field("id", Schema.INT32_SCHEMA)
                        .build();
                Schema valueSchema = SchemaBuilder.struct()
                        .name("simple.value")
                        .field("batch", Schema.INT32_SCHEMA)
                        .field("record", Schema.INT32_SCHEMA)
                        .field("timestamp", Schema.OPTIONAL_INT64_SCHEMA)
                        .build();

                // Read the offset ...
                Map<String, ?> lastOffset = context.offsetStorageReader().offset(partition);
                long lastId = lastOffset == null ? 0L : (Long) lastOffset.get("id");

                // Generate the records that we need ...
                records = new LinkedList<>();
                long initialTimestamp = System.currentTimeMillis();
                int id = 0;
                for (int batch = 0; batch != batchCount; ++batch) {
                    for (int recordNum = 0; recordNum != recordsPerBatch; ++recordNum) {
                        ++id;
                        if (id <= lastId) {
                            // We already produced this record, so skip it ...
                            continue;
                        }
                        if (!running.get()) {
                            // the task has been stopped ...
                            return;
                        }
                        // We've not seen this ID yet, so create a record ...
                        Map<String, ?> offset = Collect.hashMapOf("id", id);
                        Struct key = new Struct(keySchema);
                        key.put("id", id);
                        Struct value = new Struct(valueSchema);
                        value.put("batch", batch + 1);
                        value.put("record", recordNum + 1);
                        if (includeTimestamp) {
                            value.put("timestamp", initialTimestamp + id);
                        }
                        SourceRecord record = new SourceRecord(partition, offset, topic, 1, keySchema, key, valueSchema, value);
                        records.add(record);
                    }
                }
            }
        }

        @Override
        public List<SourceRecord> poll() throws InterruptedException {
            if (records.isEmpty()) {
                // block forever, as this thread will be interrupted if/when the task is stopped ...
                new CountDownLatch(1).await();
            }
            if (running.get()) {
                if (retryRecords != null) {
                    final List<SourceRecord> r = retryRecords;
                    retryRecords = null;
                    return r;
                }
                // Still running, so process whatever is in the queue ...
                List<SourceRecord> results = new ArrayList<>();
                int record = 0;
                while (record < recordsPerBatch && !records.isEmpty()) {
                    record++;
                    final SourceRecord fetchedRecord = records.poll();
                    final Integer id = ((Struct) (fetchedRecord.key())).getInt32("id");
                    results.add(fetchedRecord);
                    if (id == errorOnRecord) {
                        retryRecords = results;
                        throw new RetriableException("Error on record " + errorOnRecord);
                    }
                }
                return results;
            }
            // No longer running ...
            return null;
        }

        @Override
        public void stop() {
            // Request the task to stop and return immediately ...
            running.set(false);
        }
    }
  • SimpleConnectorTask继承了kafka的SourceTask,其start方法主要是根据batchCount来创建SourceRecord;其stop方法设置running为false;其poll方法主要是执行records.poll()

小结

SimpleSourceConnector继承了kafka的SourceConnector,其taskClass返回的是SimpleSourceConnector.SimpleConnectorTask.class;SimpleConnectorTask继承了kafka的SourceTask,其start方法主要是根据batchCount来创建SourceRecord;其stop方法设置running为false;其poll方法主要是执行records.poll()

doc

  • SimpleSourceConnector

本文分享自微信公众号 - 码匠的流水账(geek_luandun),作者:码匠乱炖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-05-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊debezium的SimpleSourceConnector

    debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simp...

    codecraft
  • 聊聊storm的IEventLogger

    storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/IEventLogger.java

    codecraft
  • 聊聊consul的NewService

    consul-api-1.4.1-sources.jar!/com/ecwid/consul/v1/agent/model/NewService.java

    codecraft
  • 聊聊debezium的SimpleSourceConnector

    debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simp...

    codecraft
  • 03 设计模式 抽象工厂

    建立一个最高层级的工厂,用来生产不同产品的工厂,然后再建立一个产品规范,用来规定生产的产品需要有什么样的功能。

    shimeath
  • Android设计模式二

    在组件构建过程中,某些接口之间直接的依赖常常会带来很多问题,甚至根本无法实现。采用添加一层间接(稳定)接口,来隔离本来互相紧密关联的接口是一种常见的解决方案。

    爱因斯坦福
  • 【一起学系列】之剩下的设计模式们

    【BOSS】:没呢,还有好几个设计模式没说过呢,今天再传授你三个吧,分别是建造者模式,责任链模式,备忘录模式,如何?

    Kerwin
  • 装饰模式

    概述 23种设计模式之一,英文叫DecoratorPattern,中文也叫装饰模式、修饰模式。装饰模式是在不改变类文件和不使用继承的情况下,运行期动态扩展一个对...

    高爽
  • Java 设计模式 代理模式

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://louluan.blog.c...

    亦山
  • 泛型

    mathor

扫码关注云+社区

领取腾讯云代金券