首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊debezium的BinlogReader

聊聊debezium的BinlogReader

作者头像
code4it
发布2020-05-25 16:36:46
9510
发布2020-05-25 16:36:46
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下debezium的BinlogReader

Reader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java

public interface Reader {

    public static enum State {
        /**
         * The reader is stopped and static.
         */
        STOPPED,

        /**
         * The reader is running and generated records.
         */
        RUNNING,

        /**
         * The reader has completed its work or been explicitly stopped, but not all of the generated records have been
         * consumed via {@link Reader#poll() polling}.
         */
        STOPPING;
    }

    public String name();

    public State state();

    public void uponCompletion(Runnable handler);

    public default void initialize() {
        // do nothing
    }

    public default void destroy() {
        // do nothing
    }

    public void start();

    public void stop();

    public List<SourceRecord> poll() throws InterruptedException;
}
  • Reader接口定义了name、state、uponCompletion、start、stop、poll方法

AbstractReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java

public abstract class AbstractReader implements Reader {

    protected final Logger logger = LoggerFactory.getLogger(getClass());

    private final String name;
    protected final MySqlTaskContext context;
    protected final MySqlJdbcContext connectionContext;
    private final BlockingQueue<SourceRecord> records;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicBoolean success = new AtomicBoolean(false);
    private final AtomicReference<ConnectException> failure = new AtomicReference<>();
    private ConnectException failureException;
    private final int maxBatchSize;
    private final Metronome metronome;
    private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
    private final Duration pollInterval;
    protected final ChangeEventQueueMetrics changeEventQueueMetrics;

    private final HaltingPredicate acceptAndContinue;

    public AbstractReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue) {
        this.name = name;
        this.context = context;
        this.connectionContext = context.getConnectionContext();
        this.records = new LinkedBlockingDeque<>(context.getConnectorConfig().getMaxQueueSize());
        this.maxBatchSize = context.getConnectorConfig().getMaxBatchSize();
        this.pollInterval = context.getConnectorConfig().getPollInterval();
        this.metronome = Metronome.parker(pollInterval, Clock.SYSTEM);
        this.acceptAndContinue = acceptAndContinue == null ? new AcceptAllPredicate() : acceptAndContinue;
        this.changeEventQueueMetrics = new ChangeEventQueueMetrics() {

            @Override
            public int totalCapacity() {
                return context.getConnectorConfig().getMaxQueueSize();
            }

            @Override
            public int remainingCapacity() {
                return records.remainingCapacity();
            }
        };
    }

    @Override
    public String name() {
        return name;
    }

    @Override
    public void uponCompletion(Runnable handler) {
        assert this.uponCompletion.get() == null;
        this.uponCompletion.set(handler);
    }

    @Override
    public final void initialize() {
        doInitialize();
    }

    @Override
    public final void destroy() {
        doDestroy();
    }

    @Override
    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.failure.set(null);
            this.success.set(false);
            doStart();
        }
    }

    @Override
    public void stop() {
        try {
            // Emptying the queue so to make sure that enqueue() won't block indefinitely when adding records after
            // poll() isn't called anymore but before the binlog reader is stopped; note there's still a tiny chance for
            // this to happen if enough records are added again between here and the call to disconnect(); protecting
            // against it seems not worth though it as shouldn't happen for any practical queue size
            List<SourceRecord> unsent = new ArrayList<>();
            records.drainTo(unsent);
            logger.info("Discarding {} unsent record(s) due to the connector shutting down", unsent.size());
            doStop();
            running.set(false);
        }
        finally {
            if (failure.get() != null) {
                // We had a failure and it was propagated via poll(), after which Kafka Connect will stop
                // the connector, which will stop the task that will then stop this reader via this method.
                // Since no more records will ever be polled again, we know we can clean up this reader's resources...
                doCleanup();
            }
        }
    }

    @Override
    public State state() {
        if (success.get() || failure.get() != null) {
            // We've either completed successfully or have failed, but either way no more records will be returned ...
            return State.STOPPED;
        }
        if (running.get()) {
            return State.RUNNING;
        }
        // Otherwise, we're in the process of stopping ...
        return State.STOPPING;
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        // Before we do anything else, determine if there was a failure and throw that exception ...
        failureException = this.failure.get();
        if (failureException != null) {
            // In this case, we'll throw the exception and the Kafka Connect worker or EmbeddedEngine
            // will then explicitly stop the connector task. Most likely, however, the reader that threw
            // the exception will have already stopped itself and will generate no additional records.
            // Regardless, there may be records on the queue that will never be consumed.
            throw failureException;
        }

        // this reader has been stopped before it reached the success or failed end state, so clean up and abort
        if (!running.get()) {
            cleanupResources();
            throw new InterruptedException("Reader was stopped while polling");
        }

        logger.trace("Polling for next batch of records");
        List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
        final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
        while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) {
            // No records are available even though the snapshot has not yet completed, so sleep for a bit ...
            metronome.pause();

            // Check for failure after waking up ...
            failureException = this.failure.get();
            if (failureException != null) {
                throw failureException;
            }
            if (timeout.expired()) {
                break;
            }
        }

        if (batch.isEmpty() && success.get() && records.isEmpty()) {
            // We found no records but the operation completed successfully, so we're done
            this.running.set(false);
            cleanupResources();
            return null;
        }
        pollComplete(batch);
        logger.trace("Completed batch of {} records", batch.size());
        return batch;
    }

    @Override
    public String toString() {
        return name;
    }

    //......

}
  • AbstractReader声明实现了Reader接口,其poll方法主要执行records.drainTo(batch, maxBatchSize)及pollComplete

BinlogReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

public class BinlogReader extends AbstractReader {

    private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5);
    private static final long MAX_POLL_PERIOD_IN_MILLIS = TimeUnit.HOURS.toMillis(1);

    private final boolean recordSchemaChangesInSourceRecords;
    private final RecordMakers recordMakers;
    private final SourceInfo source;
    private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class);
    private final BinaryLogClient client;

    //......

    public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue, long serverId) {
        super(name, context, acceptAndContinue);

        connectionContext = context.getConnectionContext();
        source = context.source();
        recordMakers = context.makeRecord();
        recordSchemaChangesInSourceRecords = context.includeSchemaChangeRecords();
        clock = context.getClock();
        eventDeserializationFailureHandlingMode = connectionContext.eventProcessingFailureHandlingMode();
        inconsistentSchemaHandlingMode = connectionContext.inconsistentSchemaHandlingMode();

        // Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour...
        pollOutputDelay = ElapsedTimeStrategy.exponential(clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);

        // Set up the log reader ...
        client = new BinaryLogClient(connectionContext.hostname(), connectionContext.port(), connectionContext.username(), connectionContext.password());
        // BinaryLogClient will overwrite thread names later
        client.setThreadFactory(Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "binlog-client", false));
        client.setServerId(serverId);
        client.setSSLMode(sslModeFor(connectionContext.sslMode()));
        if (connectionContext.sslModeEnabled()) {
            SSLSocketFactory sslSocketFactory = getBinlogSslSocketFactory(connectionContext);
            if (sslSocketFactory != null) {
                client.setSslSocketFactory(sslSocketFactory);
            }
        }
        client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
        final long keepAliveInterval = context.config().getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS);
        client.setKeepAliveInterval(keepAliveInterval);
        // Considering heartbeatInterval should be less than keepAliveInterval, we use the heartbeatIntervalFactor
        // multiply by keepAliveInterval and set the result value to heartbeatInterval.The default value of heartbeatIntervalFactor
        // is 0.8, and we believe the left time (0.2 * keepAliveInterval) is enough to process the packet received from the MySQL server.
        client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor));
        client.registerEventListener(context.bufferSizeForBinlogReader() == 0
                ? this::handleEvent
                : (new EventBuffer(context.bufferSizeForBinlogReader(), this))::add);

        client.registerLifecycleListener(new ReaderThreadLifecycleListener());
        if (logger.isDebugEnabled()) {
            client.registerEventListener(this::logEvent);
        }

        //......

        client.setEventDeserializer(eventDeserializer);

        // Set up for JMX ...
        metrics = new BinlogReaderMetrics(client, context, name, changeEventQueueMetrics);
        heartbeat = Heartbeat.create(context.config(), context.topicSelector().getHeartbeatTopic(),
                context.getConnectorConfig().getLogicalName());

    }
    
    @Override
    protected void doStart() {
        context.dbSchema().assureNonEmptySchema();

        // Register our event handlers ...
        eventHandlers.put(EventType.STOP, this::handleServerStop);
        eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
        eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
        eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
        eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
        eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
        eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
        eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
        eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
        eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
        eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
        eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
        eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
        eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
        eventHandlers.put(EventType.XID, this::handleTransactionCompletion);

        // Conditionally register ROWS_QUERY handler to parse SQL statements.
        if (context.includeSqlQuery()) {
            eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery);
        }

        final boolean isGtidModeEnabled = connectionContext.isGtidModeEnabled();
        metrics.setIsGtidModeEnabled(isGtidModeEnabled);

        // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint.
        String availableServerGtidStr = connectionContext.knownGtidSet();
        if (isGtidModeEnabled) {
            // The server is using GTIDs, so enable the handler ...
            eventHandlers.put(EventType.GTID, this::handleGtidEvent);

            // Now look at the GTID set from the server and what we've previously seen ...
            GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);

            // also take into account purged GTID logs
            GtidSet purgedServerGtidSet = connectionContext.purgedGtidSet();
            logger.info("GTID set purged on server: {}", purgedServerGtidSet);

            GtidSet filteredGtidSet = context.filterGtidSet(availableServerGtidSet, purgedServerGtidSet);
            if (filteredGtidSet != null) {
                // We've seen at least some GTIDs, so start reading from the filtered GTID set ...
                logger.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
                String filteredGtidSetStr = filteredGtidSet.toString();
                client.setGtidSet(filteredGtidSetStr);
                source.setCompletedGtidSet(filteredGtidSetStr);
                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
            }
            else {
                // We've not yet seen any GTIDs, so that means we have to start reading the binlog from the beginning ...
                client.setBinlogFilename(source.binlogFilename());
                client.setBinlogPosition(source.binlogPosition());
                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
            }
        }
        else {
            // The server is not using GTIDs, so start reading the binlog based upon where we last left off ...
            client.setBinlogFilename(source.binlogFilename());
            client.setBinlogPosition(source.binlogPosition());
        }

        // We may be restarting in the middle of a transaction, so see how far into the transaction we have already processed...
        initialEventsToSkip = source.eventsToSkipUponRestart();

        // Set the starting row number, which is the next row number to be read ...
        startingRowNumber = source.rowsToSkipUponRestart();

        // Only when we reach the first BEGIN event will we start to skip events ...
        skipEvent = false;

        // Initial our poll output delay logic ...
        pollOutputDelay.hasElapsed();
        previousOutputMillis = clock.currentTimeInMillis();

        // Start the log reader, which starts background threads ...
        if (isRunning()) {
            long timeout = context.getConnectorConfig().getConnectionTimeout().toMillis();
            long started = context.getClock().currentTimeInMillis();
            try {
                logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
                client.connect(timeout);
            }
            catch (TimeoutException e) {
                // If the client thread is interrupted *before* the client could connect, the client throws a timeout exception
                // The only way we can distinguish this is if we get the timeout exception before the specified timeout has
                // elapsed, so we simply check this (within 10%) ...
                long duration = context.getClock().currentTimeInMillis() - started;
                if (duration > (0.9 * timeout)) {
                    double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
                    throw new ConnectException("Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " +
                            connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "'", e);
                }
                // Otherwise, we were told to shutdown, so we don't care about the timeout exception
            }
            catch (AuthenticationException e) {
                throw new ConnectException("Failed to authenticate to the MySQL database at " +
                        connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "'", e);
            }
            catch (Throwable e) {
                throw new ConnectException("Unable to connect to the MySQL database at " +
                        connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "': " + e.getMessage(), e);
            }
        }
    }

    @Override
    protected void doStop() {
        try {
            if (client.isConnected()) {
                logger.debug("Stopping binlog reader '{}', last recorded offset: {}", this.name(), lastOffset);
                client.disconnect();
            }
            cleanupResources();
        }
        catch (IOException e) {
            logger.error("Unexpected error when disconnecting from the MySQL binary log reader '{}'", this.name(), e);
        }
    }

    @Override
    protected void pollComplete(List<SourceRecord> batch) {
        // Record a bit about this batch ...
        int batchSize = batch.size();
        recordCounter += batchSize;
        totalRecordCounter.addAndGet(batchSize);
        if (batchSize > 0) {
            SourceRecord lastRecord = batch.get(batchSize - 1);
            lastOffset = lastRecord.sourceOffset();
            if (pollOutputDelay.hasElapsed()) {
                // We want to record the status ...
                long millisSinceLastOutput = clock.currentTimeInMillis() - previousOutputMillis;
                try {
                    if (logger.isInfoEnabled()) {
                        context.temporaryLoggingContext("binlog", () -> {
                            logger.info("{} records sent during previous {}, last recorded offset: {}",
                                    recordCounter, Strings.duration(millisSinceLastOutput), lastOffset);
                        });
                    }
                }
                finally {
                    recordCounter = 0;
                    previousOutputMillis += millisSinceLastOutput;
                }
            }
        }
    }
                
    //......

}
  • BinlogReader继承了AbstractReader,其构造器创建了BinaryLogClient并设置了registerEventListener(handleEvent)及eventDeserializer;其doStart方法初始化eventHandlers,然后设置gtidSet或者binlogFilename及binlogPosition,然后执行client.connect(timeout);其doStop方法执行client.disconnect();其pollComplete方法主要更新recordCounter及totalRecordCounter等metrics

小结

BinlogReader继承了AbstractReader,其构造器创建了BinaryLogClient并设置了registerEventListener(handleEvent)及eventDeserializer;其doStart方法初始化eventHandlers,然后设置gtidSet或者binlogFilename及binlogPosition,然后执行client.connect(timeout);其doStop方法执行client.disconnect();其pollComplete方法主要更新recordCounter及totalRecordCounter等metrics

doc

  • BinlogReader
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Reader
  • AbstractReader
  • BinlogReader
  • 小结
  • doc
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档