首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊DebeziumEngine

聊聊DebeziumEngine

原创
作者头像
code4it
修改2020-05-18 11:04:55
5430
修改2020-05-18 11:04:55
举报

本文主要研究一下DebeziumEngine

DebeziumEngine

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java

@Incubating
public interface DebeziumEngine<R> extends Runnable, Closeable {
    
    //......
​
    public static <T> Builder<T> create(Class<? extends ChangeEventFormat<T>> eventFormat) {
        final ServiceLoader<Builder> loader = ServiceLoader.load(Builder.class);
        final Iterator<Builder> iterator = loader.iterator();
        if (!iterator.hasNext()) {
            throw new DebeziumException("No implementation of Debezium engine builder was found");
        }
        final Builder builder = iterator.next();
        if (iterator.hasNext()) {
            LoggerFactory.getLogger(Builder.class).warn("More than one Debezium engine builder implementation was found, using {}", builder.getClass());
        }
        return builder;
    }
​
    //......
​
}
  • DebeziumEngine提供了create方法,它接收eventFormat参数,然后使用ServiceLoader.load(Builder.class)加载Builder.class,然后返回第一个builder

DebeziumEngine.Builder

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java

    public static interface Builder<R> {
​
        Builder<R> notifying(Consumer<R> consumer);
​
        Builder<R> notifying(ChangeConsumer<R> handler);
​
        Builder<R> using(Properties config);
​
        Builder<R> using(ClassLoader classLoader);
​
        Builder<R> using(Clock clock);
​
        Builder<R> using(CompletionCallback completionCallback);
​
        Builder<R> using(ConnectorCallback connectorCallback);
​
        Builder<R> using(OffsetCommitPolicy policy);
​
        DebeziumEngine<R> build();
    }
  • Builder接口定义了notifying、using、build方法

EmbeddedEngine.BuilderImpl

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java

    public static final class BuilderImpl implements Builder {
        private Configuration config;
        private DebeziumEngine.ChangeConsumer<SourceRecord> handler;
        private ClassLoader classLoader;
        private Clock clock;
        private DebeziumEngine.CompletionCallback completionCallback;
        private DebeziumEngine.ConnectorCallback connectorCallback;
        private OffsetCommitPolicy offsetCommitPolicy = null;
​
        @Override
        public Builder using(Configuration config) {
            this.config = config;
            return this;
        }
​
        @Override
        public Builder using(Properties config) {
            this.config = Configuration.from(config);
            return this;
        }
​
        @Override
        public Builder using(ClassLoader classLoader) {
            this.classLoader = classLoader;
            return this;
        }
​
        @Override
        public Builder using(Clock clock) {
            this.clock = clock;
            return this;
        }
​
        @Override
        public Builder using(DebeziumEngine.CompletionCallback completionCallback) {
            this.completionCallback = completionCallback;
            return this;
        }
​
        @Override
        public Builder using(DebeziumEngine.ConnectorCallback connectorCallback) {
            this.connectorCallback = connectorCallback;
            return this;
        }
​
        @Override
        public Builder using(OffsetCommitPolicy offsetCommitPolicy) {
            this.offsetCommitPolicy = offsetCommitPolicy;
            return this;
        }
​
        @Override
        public Builder notifying(Consumer<SourceRecord> consumer) {
            this.handler = buildDefaultChangeConsumer(consumer);
            return this;
        }
​
        @Override
        public Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> handler) {
            this.handler = handler;
            return this;
        }
​
        @Override
        public Builder using(java.time.Clock clock) {
            return using(new Clock() {
​
                @Override
                public long currentTimeInMillis() {
                    return clock.millis();
                }
            });
        }
​
        @Override
        public EmbeddedEngine build() {
            if (classLoader == null) {
                classLoader = getClass().getClassLoader();
            }
            if (clock == null) {
                clock = Clock.system();
            }
            Objects.requireNonNull(config, "A connector configuration must be specified.");
            Objects.requireNonNull(handler, "A connector consumer or changeHandler must be specified.");
            return new EmbeddedEngine(config, classLoader, clock,
                    handler, completionCallback, connectorCallback, offsetCommitPolicy);
        }
​
        // backward compatibility methods
        @Override
        public Builder using(CompletionCallback completionCallback) {
            return using((DebeziumEngine.CompletionCallback) completionCallback);
        }
​
        @Override
        public Builder using(ConnectorCallback connectorCallback) {
            return using((DebeziumEngine.ConnectorCallback) connectorCallback);
        }
    }
  • BuilderImpl实现了Builder接口,其build方法创建的是EmbeddedEngine

EmbeddedEngine

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java

@ThreadSafe
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
​
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Configuration config;
    private final Clock clock;
    private final ClassLoader classLoader;
    private final DebeziumEngine.ChangeConsumer<SourceRecord> handler;
    private final DebeziumEngine.CompletionCallback completionCallback;
    private final DebeziumEngine.ConnectorCallback connectorCallback;
    private final AtomicReference<Thread> runningThread = new AtomicReference<>();
    private final VariableLatch latch = new VariableLatch(0);
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final WorkerConfig workerConfig;
    private final CompletionResult completionResult;
    private long recordsSinceLastCommit = 0;
    private long timeOfLastCommitMillis = 0;
    private OffsetCommitPolicy offsetCommitPolicy;
​
    private SourceTask task;
​
    private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer<SourceRecord> handler,
                           DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback,
                           OffsetCommitPolicy offsetCommitPolicy) {
        this.config = config;
        this.handler = handler;
        this.classLoader = classLoader;
        this.clock = clock;
        this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {
            if (!success) {
                logger.error(msg, error);
            }
        };
        this.connectorCallback = connectorCallback;
        this.completionResult = new CompletionResult();
        this.offsetCommitPolicy = offsetCommitPolicy;
​
        assert this.config != null;
        assert this.handler != null;
        assert this.classLoader != null;
        assert this.clock != null;
        keyConverter = config.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> this.classLoader);
        keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);
        valueConverter = config.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> this.classLoader);
        Configuration valueConverterConfig = config;
        if (valueConverter instanceof JsonConverter) {
            // Make sure that the JSON converter is configured to NOT enable schemas ...
            valueConverterConfig = config.edit().with(INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build();
        }
        valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);
​
        // Create the worker config, adding extra fields that are required for validation of a worker config
        // but that are not used within the embedded engine (since the source records are never serialized) ...
        Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS);
        embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
        embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
        workerConfig = new EmbeddedConfig(embeddedConfig);
    }
​
    public void run() {
        if (runningThread.compareAndSet(null, Thread.currentThread())) {
​
            final String engineName = config.getString(ENGINE_NAME);
            final String connectorClassName = config.getString(CONNECTOR_CLASS);
            final Optional<DebeziumEngine.ConnectorCallback> connectorCallback = Optional.ofNullable(this.connectorCallback);
            // Only one thread can be in this part of the method at a time ...
            latch.countUp();
            try {
                if (!config.validateAndRecord(CONNECTOR_FIELDS, logger::error)) {
                    fail("Failed to start connector with invalid configuration (see logs for actual errors)");
                    return;
                }
​
                // Instantiate the connector ...
                SourceConnector connector = null;
                try {
                    @SuppressWarnings("unchecked")
                    Class<? extends SourceConnector> connectorClass = (Class<SourceConnector>) classLoader.loadClass(connectorClassName);
                    connector = connectorClass.getDeclaredConstructor().newInstance();
                }
                catch (Throwable t) {
                    fail("Unable to instantiate connector class '" + connectorClassName + "'", t);
                    return;
                }
​
                // Instantiate the offset store ...
                final String offsetStoreClassName = config.getString(OFFSET_STORAGE);
                OffsetBackingStore offsetStore = null;
                try {
                    @SuppressWarnings("unchecked")
                    Class<? extends OffsetBackingStore> offsetStoreClass = (Class<OffsetBackingStore>) classLoader.loadClass(offsetStoreClassName);
                    offsetStore = offsetStoreClass.getDeclaredConstructor().newInstance();
                }
                catch (Throwable t) {
                    fail("Unable to instantiate OffsetBackingStore class '" + offsetStoreClassName + "'", t);
                    return;
                }
​
                // Initialize the offset store ...
                try {
                    offsetStore.configure(workerConfig);
                    offsetStore.start();
                }
                catch (Throwable t) {
                    fail("Unable to configure and start the '" + offsetStoreClassName + "' offset backing store", t);
                    return;
                }
​
                // Set up the offset commit policy ...
                if (offsetCommitPolicy == null) {
                    offsetCommitPolicy = config.getInstance(EmbeddedEngine.OFFSET_COMMIT_POLICY, OffsetCommitPolicy.class, config);
                }
​
                // Initialize the connector using a context that does NOT respond to requests to reconfigure tasks ...
                ConnectorContext context = new ConnectorContext() {
​
                    @Override
                    public void requestTaskReconfiguration() {
                        // Do nothing ...
                    }
​
                    @Override
                    public void raiseError(Exception e) {
                        fail(e.getMessage(), e);
                    }
​
                };
                connector.initialize(context);
                OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName,
                        keyConverter, valueConverter);
                OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName,
                        keyConverter, valueConverter);
                Duration commitTimeout = Duration.ofMillis(config.getLong(OFFSET_COMMIT_TIMEOUT_MS));
​
                try {
                    // Start the connector with the given properties and get the task configurations ...
                    connector.start(config.asMap());
                    connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);
                    List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
                    Class<? extends Task> taskClass = connector.taskClass();
                    task = null;
                    try {
                        task = (SourceTask) taskClass.getDeclaredConstructor().newInstance();
                    }
                    catch (IllegalAccessException | InstantiationException t) {
                        fail("Unable to instantiate connector's task class '" + taskClass.getName() + "'", t);
                        return;
                    }
                    try {
                        SourceTaskContext taskContext = new SourceTaskContext() {
                            @Override
                            public OffsetStorageReader offsetStorageReader() {
                                return offsetReader;
                            }
​
                            public Map<String, String> configs() {
                                // TODO Auto-generated method stub
                                return null;
                            }
                        };
                        task.initialize(taskContext);
                        task.start(taskConfigs.get(0));
                        connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted);
                    }
                    catch (Throwable t) {
                        // Mask the passwords ...
                        Configuration config = Configuration.from(taskConfigs.get(0)).withMaskedPasswords();
                        String msg = "Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: "
                                + config;
                        fail(msg, t);
                        return;
                    }
​
                    recordsSinceLastCommit = 0;
                    Throwable handlerError = null;
                    try {
                        timeOfLastCommitMillis = clock.currentTimeInMillis();
                        RecordCommitter committer = buildRecordCommitter(offsetWriter, task, commitTimeout);
                        while (runningThread.get() != null) {
                            List<SourceRecord> changeRecords = null;
                            try {
                                logger.debug("Embedded engine is polling task for records on thread {}", runningThread.get());
                                changeRecords = task.poll(); // blocks until there are values ...
                                logger.debug("Embedded engine returned from polling task for records");
                            }
                            catch (InterruptedException e) {
                                // Interrupted while polling ...
                                logger.debug("Embedded engine interrupted on thread {} while polling the task for records", runningThread.get());
                                if (this.runningThread.get() == Thread.currentThread()) {
                                    // this thread is still set as the running thread -> we were not interrupted
                                    // due the stop() call -> probably someone else called the interrupt on us ->
                                    // -> we should raise the interrupt flag
                                    Thread.currentThread().interrupt();
                                }
                                break;
                            }
                            catch (RetriableException e) {
                                logger.info("Retrieable exception thrown, connector will be restarted", e);
                                // Retriable exception should be ignored by the engine
                                // and no change records delivered.
                                // The retry is handled in io.debezium.connector.common.BaseSourceTask.poll()
                            }
                            try {
                                if (changeRecords != null && !changeRecords.isEmpty()) {
                                    logger.debug("Received {} records from the task", changeRecords.size());
​
                                    try {
                                        handler.handleBatch(changeRecords, committer);
                                    }
                                    catch (StopConnectorException e) {
                                        break;
                                    }
                                }
                                else {
                                    logger.debug("Received no records from the task");
                                }
                            }
                            catch (Throwable t) {
                                // There was some sort of unexpected exception, so we should stop work
                                handlerError = t;
                                break;
                            }
                        }
                    }
                    finally {
                        if (handlerError != null) {
                            // There was an error in the handler so make sure it's always captured...
                            fail("Stopping connector after error in the application's handler method: " + handlerError.getMessage(),
                                    handlerError);
                        }
                        try {
                            // First stop the task ...
                            logger.debug("Stopping the task and engine");
                            task.stop();
                            connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStopped);
                            // Always commit offsets that were captured from the source records we actually processed ...
                            commitOffsets(offsetWriter, commitTimeout, task);
                            if (handlerError == null) {
                                // We stopped normally ...
                                succeed("Connector '" + connectorClassName + "' completed normally.");
                            }
                        }
                        catch (Throwable t) {
                            fail("Error while trying to stop the task and commit the offsets", t);
                        }
                    }
                }
                catch (Throwable t) {
                    fail("Error while trying to run connector class '" + connectorClassName + "'", t);
                }
                finally {
                    // Close the offset storage and finally the connector ...
                    try {
                        offsetStore.stop();
                    }
                    catch (Throwable t) {
                        fail("Error while trying to stop the offset store", t);
                    }
                    finally {
                        try {
                            connector.stop();
                            connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStopped);
                        }
                        catch (Throwable t) {
                            fail("Error while trying to stop connector class '" + connectorClassName + "'", t);
                        }
                    }
                }
            }
            finally {
                latch.countDown();
                runningThread.set(null);
                // after we've "shut down" the engine, fire the completion callback based on the results we collected
                completionCallback.handle(completionResult.success(), completionResult.message(), completionResult.error());
            }
        }
    }
​
    public boolean stop() {
        logger.debug("Stopping the embedded engine");
        // Signal that the run() method should stop ...
        Thread thread = this.runningThread.getAndSet(null);
        if (thread != null) {
            try {
                latch.await(
                        Long.valueOf(
                                System.getProperty(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP, Long.toString(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT.toMillis()))),
                        TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
            }
            logger.debug("Interrupting the embedded engine's thread {} (already interrupted: {})", thread, thread.isInterrupted());
            // Interrupt the thread in case it is blocked while polling the task for records ...
            thread.interrupt();
            return true;
        }
        return false;
    }
​
    @Override
    public void close() throws IOException {
        stop();
    }
​
}
  • EmbeddedEngine实现了DebeziumEngine接口,其run方法主要是实例化SourceConnector,OffsetBackingStore,执行offsetStore.configure(workerConfig)及offsetStore.start()以及connector.initialize(context)、connector.start(config.asMap()),然后创创建connector task,执行task.initialize(taskContext)及task.start(taskConfigs.get(0));之后通过while循环执行task.poll()获取changeRecords,然后通过handler.handleBatch(changeRecords, committer)进行回调,最后在finally的时候执行task.stop()以及commitOffsets

小结

DebeziumEngine提供了create方法,它接收eventFormat参数,然后使用ServiceLoader.load(Builder.class)加载Builder.class,然后返回第一个builder;BuilderImpl实现了Builder接口,其build方法创建的是EmbeddedEngine

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DebeziumEngine
  • DebeziumEngine.Builder
  • EmbeddedEngine.BuilderImpl
  • EmbeddedEngine
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档