前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊debezium的SimpleSourceConnector

聊聊debezium的SimpleSourceConnector

作者头像
code4it
发布2020-05-25 16:35:24
3270
发布2020-05-25 16:35:24
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下debezium的SimpleSourceConnector

SimpleSourceConnector

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java

代码语言:javascript
复制
public class SimpleSourceConnector extends SourceConnector {

    protected static final String VERSION = "1.0";

    public static final String TOPIC_NAME = "topic.name";
    public static final String RECORD_COUNT_PER_BATCH = "record.count.per.batch";
    public static final String BATCH_COUNT = "batch.count";
    public static final String DEFAULT_TOPIC_NAME = "simple.topic";
    public static final String INCLUDE_TIMESTAMP = "include.timestamp";
    public static final String RETRIABLE_ERROR_ON = "error.retriable.on";
    public static final int DEFAULT_RECORD_COUNT_PER_BATCH = 1;
    public static final int DEFAULT_BATCH_COUNT = 10;
    public static final boolean DEFAULT_INCLUDE_TIMESTAMP = false;

    private Map<String, String> config;

    public SimpleSourceConnector() {
    }

    @Override
    public String version() {
        return VERSION;
    }

    @Override
    public void start(Map<String, String> props) {
        config = props;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return SimpleSourceConnector.SimpleConnectorTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> configs = new ArrayList<>();
        configs.add(config);
        return configs;
    }

    @Override
    public void stop() {
        // do nothing
    }

    @Override
    public ConfigDef config() {
        return null;
    }

    //......

}
  • SimpleSourceConnector继承了kafka的SourceConnector,其taskClass返回的是SimpleSourceConnector.SimpleConnectorTask.class

SimpleConnectorTask

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java

代码语言:javascript
复制
    public static class SimpleConnectorTask extends SourceTask {

        private int recordsPerBatch;
        private int errorOnRecord;
        private Queue<SourceRecord> records;
        private final AtomicBoolean running = new AtomicBoolean();
        private List<SourceRecord> retryRecords = null;

        @Override
        public String version() {
            return VERSION;
        }

        @Override
        public void start(Map<String, String> props) {
            if (running.compareAndSet(false, true)) {
                Configuration config = Configuration.from(props);
                recordsPerBatch = config.getInteger(RECORD_COUNT_PER_BATCH, DEFAULT_RECORD_COUNT_PER_BATCH);
                int batchCount = config.getInteger(BATCH_COUNT, DEFAULT_BATCH_COUNT);
                String topic = config.getString(TOPIC_NAME, DEFAULT_TOPIC_NAME);
                boolean includeTimestamp = config.getBoolean(INCLUDE_TIMESTAMP, DEFAULT_INCLUDE_TIMESTAMP);
                errorOnRecord = config.getInteger(RETRIABLE_ERROR_ON, -1);

                // Create the partition and schemas ...
                Map<String, ?> partition = Collect.hashMapOf("source", "simple");
                Schema keySchema = SchemaBuilder.struct()
                        .name("simple.key")
                        .field("id", Schema.INT32_SCHEMA)
                        .build();
                Schema valueSchema = SchemaBuilder.struct()
                        .name("simple.value")
                        .field("batch", Schema.INT32_SCHEMA)
                        .field("record", Schema.INT32_SCHEMA)
                        .field("timestamp", Schema.OPTIONAL_INT64_SCHEMA)
                        .build();

                // Read the offset ...
                Map<String, ?> lastOffset = context.offsetStorageReader().offset(partition);
                long lastId = lastOffset == null ? 0L : (Long) lastOffset.get("id");

                // Generate the records that we need ...
                records = new LinkedList<>();
                long initialTimestamp = System.currentTimeMillis();
                int id = 0;
                for (int batch = 0; batch != batchCount; ++batch) {
                    for (int recordNum = 0; recordNum != recordsPerBatch; ++recordNum) {
                        ++id;
                        if (id <= lastId) {
                            // We already produced this record, so skip it ...
                            continue;
                        }
                        if (!running.get()) {
                            // the task has been stopped ...
                            return;
                        }
                        // We've not seen this ID yet, so create a record ...
                        Map<String, ?> offset = Collect.hashMapOf("id", id);
                        Struct key = new Struct(keySchema);
                        key.put("id", id);
                        Struct value = new Struct(valueSchema);
                        value.put("batch", batch + 1);
                        value.put("record", recordNum + 1);
                        if (includeTimestamp) {
                            value.put("timestamp", initialTimestamp + id);
                        }
                        SourceRecord record = new SourceRecord(partition, offset, topic, 1, keySchema, key, valueSchema, value);
                        records.add(record);
                    }
                }
            }
        }

        @Override
        public List<SourceRecord> poll() throws InterruptedException {
            if (records.isEmpty()) {
                // block forever, as this thread will be interrupted if/when the task is stopped ...
                new CountDownLatch(1).await();
            }
            if (running.get()) {
                if (retryRecords != null) {
                    final List<SourceRecord> r = retryRecords;
                    retryRecords = null;
                    return r;
                }
                // Still running, so process whatever is in the queue ...
                List<SourceRecord> results = new ArrayList<>();
                int record = 0;
                while (record < recordsPerBatch && !records.isEmpty()) {
                    record++;
                    final SourceRecord fetchedRecord = records.poll();
                    final Integer id = ((Struct) (fetchedRecord.key())).getInt32("id");
                    results.add(fetchedRecord);
                    if (id == errorOnRecord) {
                        retryRecords = results;
                        throw new RetriableException("Error on record " + errorOnRecord);
                    }
                }
                return results;
            }
            // No longer running ...
            return null;
        }

        @Override
        public void stop() {
            // Request the task to stop and return immediately ...
            running.set(false);
        }
    }
  • SimpleConnectorTask继承了kafka的SourceTask,其start方法主要是根据batchCount来创建SourceRecord;其stop方法设置running为false;其poll方法主要是执行records.poll()

小结

SimpleSourceConnector继承了kafka的SourceConnector,其taskClass返回的是SimpleSourceConnector.SimpleConnectorTask.class;SimpleConnectorTask继承了kafka的SourceTask,其start方法主要是根据batchCount来创建SourceRecord;其stop方法设置running为false;其poll方法主要是执行records.poll()

doc

  • SimpleSourceConnector
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SimpleSourceConnector
  • SimpleConnectorTask
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档