前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【万字长文】Flink cdc源码精讲(推荐收藏)

【万字长文】Flink cdc源码精讲(推荐收藏)

作者头像
857技术社区
发布2022-05-17 16:18:52
3.7K0
发布2022-05-17 16:18:52
举报
文章被收录于专栏:857-Bigdata857-Bigdata

前言

flink-cdc源码地址 : https://github.com/ververica/flink-cdc-connectors flink-cdc不再flink项目中,在flink1.11之后flink引入cdc功能,下面我们以源码深入了解flink-cdc实现原理, 我们主要以flink-cdc-mysql为主,其余代码基本差不太多 事先需要先简单了解一下debezium相关原理,flink-cdc是基于debezium实现的

一点建议 :

  • 在阅读源码的时候,我们应该带着问题去思考,然后一步一步去阅读源码,在阅读源码的过程中,不要被一些不重要的点给占用过多的时间精力,并且一遍两遍是不会让我有一个清晰的印象的,毕竟别人多少年多少人的开发,看一两遍就可以理解的,在阅读某个框架源码之前,我们应该已经对该框架原理有一定的理解,然后根据我们的理解去验证他是代码实现的样子,或者带着思考去阅读,为什么这么实现,这么实现的好处是什么等.,其实代码都是一样的,只不过是每个人的实现方式不同,考虑的问题不同而已,
  • 要有一定的java基础,熟悉多线程,了解开发使用的相关接口(或者自己看了介绍之后很容易理解),如果基础不牢,更多的是建议先从基础学习,然后写一写代码测试,比如多线程的时候怎么做交互等,自己写一写,在后面阅读源码的时候会更容理解里面内容
  • 该内容要首先对cdc有一定的了解,知道cdc的相关原理,flink-cdc的实现基于debezium实现,该框架是开源的,可以先去了解一下,这样对于我们后面内容会更容易理解

谨记: 阅读的时候抓住重点!!!!!!!! , 不要被不重要的内容占用时间

一.项目结构(mysql-cdc为主)

1. 目录结构

  • 带有test项目都是用于测试的项目
  • 后缀带有cdc的表示一个database的连接器,区分sql与api形式
  • flink-format-changelog-json : 用于解析json成RowData的模块
  • flink-connector-debezium : 该模块封装debezium以及相关核心代码实现,并且修改了debezium的部分源码
  • 每个项目中都有test目录,里面有相关的测试代码,可以自行测试代码debug

2. mysql项目源码包结构

  • debezium : debezium用到的相关类
  • schema : mysql schema(表结构)相关代码
  • source : mysql-cdc source实现代码,包括全量读mysql,分割器,读取器等相关
  • table : cdc table实现代码主要以table dynamic factory的实现
  • resrouces : 该目录用与spi方式动态加载table factory,用于sql创建table找到对应的工厂类

二.mysql-cdc源码-SourceFucntion的单并行度的实现

  • 基于RichSourceFunction的,单并行读取 1,11之前的source接口,已被标记Deprecated
  • 基于Source的,多并行度,1.11之后新出的srouce接口,实现要更复杂

我们主要根据单并行度源码基于讲解,这样更方便理解

具体入手我们可以根据文档中的创建source的类来一点一点走

代码语言:javascript
复制
MySqlSource通过构建者模式(23种设计模式)构建,我们只需要知道我们可以设置哪些参数即可,这个比较容易理解
 
 // 通过构建者方式配置任务启动时候所需要的参数
  public static class Builder<T> {// MysqlSourcen内部类

       private int port = 3306; // default 3306 port
       private String hostname;
       private String[] databaseList;
       private String username;
       private String password;
       private Integer serverId;
       private String serverTimeZone; //时区
       private String[] tableList;
       private Properties dbzProperties; // 传入的dbz引擎所需的属性
       private StartupOptions startupOptions = StartupOptions.initial(); // 用于控制开始binlog开始消费位置的参数
       private DebeziumDeserializationSchema<T> deserializer; // 用于对数据解析成什么样子如json,String等,定义序列化方式
   
     //上面参数配置完成,通过build构建sourceFunctionn,主要将配置信息封装带properties中,这里面的参数主要是debezium所需要启动参数,配置信息等,如果想要了解可以去debezium官网查看参数的具体细节
    public DebeziumSourceFunction<T> build() {
           Properties props = new Properties();
           props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
           // hard code server name, because we don't need to distinguish it, docs:
           // Logical name that identifies and provides a namespace for the particular MySQL
           // database
           // server/cluster being monitored. The logical name should be unique across all other
           // connectors,
           // since it is used as a prefix for all Kafka topic names emanating from this connector.
           // Only alphanumeric characters and underscores should be used.
           props.setProperty("database.server.name", DATABASE_SERVER_NAME);
           props.setProperty("database.hostname", checkNotNull(hostname));
           props.setProperty("database.user", checkNotNull(username));
           props.setProperty("database.password", checkNotNull(password));
           props.setProperty("database.port", String.valueOf(port));
           props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
           // debezium use "long" mode to handle unsigned bigint by default,
           // but it'll cause lose of precise when the value is larger than 2^63,
           // so use "precise" mode to avoid it.
           props.put("bigint.unsigned.handling.mode", "precise");

           if (serverId != null) { props.setProperty("database.server.id", String.valueOf(serverId)); }
           if (databaseList != null) {  props.setProperty("database.whitelist", String.join(",", databaseList)); }
           if (tableList != null) { props.setProperty("table.whitelist", String.join(",", tableList));}
           if (serverTimeZone != null) {  props.setProperty("database.serverTimezone", serverTimeZone);   }

      // 判断开始消费位置,在sqlSourceBuilder中构建的参数,没有则为null
           DebeziumOffset specificOffset = null;
           switch (startupOptions.startupMode) {
               case INITIAL:
                   props.setProperty("snapshot.mode", "initial");
                   break;

               case EARLIEST_OFFSET:
                   props.setProperty("snapshot.mode", "never");
                   break;

               case LATEST_OFFSET:
                   props.setProperty("snapshot.mode", "schema_only");
                   break;

               case SPECIFIC_OFFSETS:      
                   props.setProperty("snapshot.mode", "schema_only_recovery");
                   specificOffset = new DebeziumOffset();
                   Map<String, String> sourcePartition = new HashMap<>();
                   sourcePartition.put("server", DATABASE_SERVER_NAME);
                   specificOffset.setSourcePartition(sourcePartition);

                   Map<String, Object> sourceOffset = new HashMap<>();
                   sourceOffset.put("file", startupOptions.specificOffsetFile);
                   sourceOffset.put("pos", startupOptions.specificOffsetPos);
                   specificOffset.setSourceOffset(sourceOffset);
                   break;

               case TIMESTAMP:
                   checkNotNull(deserializer);
                   props.setProperty("snapshot.mode", "never");
                   deserializer =
                           new SeekBinlogToTimestampFilter<>(
                                   startupOptions.startupTimestampMillis, deserializer);
                   break;

               default:
                   throw new UnsupportedOperationException();
          }

           if (dbzProperties != null) {
               props.putAll(dbzProperties);
               // Add default configurations for compatibility when set the legacy mysql connector
               // implementation
               if (LEGACY_IMPLEMENTATION_VALUE.equals(
                       dbzProperties.get(LEGACY_IMPLEMENTATION_KEY))) {
                   props.put("transforms", "snapshotasinsert");
                   props.put(
                           "transforms.snapshotasinsert.type",
                           "io.debezium.connector.mysql.transforms.ReadToInsertEvent");
              }
          }

       // 构建通用的cdc sourceFunction --> 基于richSourceFunction
           return new DebeziumSourceFunction<>(
                   deserializer, props, specificOffset,
             new MySqlValidator(props) // mysql校验器,版本信息,binlog是否为row等
          );
      }
  }

上面内容主要是以构建source所需要的参数为主,具体我们进入到DebeziumSourceFunction中看看具体实现

代码语言:javascript
复制
// source代码,用于读取binlog,logminer等
// 实现richSourceFuntion完成source端代码的编写,实现ChecckpointFunction用于保证容错相关的内容,实现checkpointListener监听checkpoint的完成状态
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
       implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> {
 
 // ------------------------------列出一些比较重要的成员变量,不重要的忽略了------------------------------------------

   // ----------------------------------State-------------------------------------------------
/* 主要用于状态的维护,当任务出现问题重启/手动重启后,维护的一些schema(record中的结构) 未消费的records(在queue中,后面会看到) offset等信息 */
   private transient volatile String restoredOffsetState;
   private transient ListState<byte[]> offsetState;
   private transient ListState<String> schemaRecordsState;

   // -----------------------------------Worker-----------------------------------------------
/* 一个单线程的线程池,一个debeziumEngine(一个runnable的实现类)用与读取binlog数据  
TODO 所以设计到多线程的交互*/
   private transient ExecutorService executor;
   private transient DebeziumEngine<?> engine;

   /* 一个consumer,用于从engine中读取数据的消费者,并将数据放入handover中 */
   private transient DebeziumChangeConsumer changeConsumer;

   /* 用于从handover中拿取数据 */
   private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;

   /* 两个线程(source,engine)之间交互数据的一个桥梁 */
   private transient Handover handover;

 // ----------------------------------------我们主要介绍srouce的run方法,其他方法主要用于容错相关--------------------------------------
 
   @Override
   public void run(SourceContext<T> sourceContext) throws Exception {
      // TODO 用于engine执行的一些相关参数,不是终点内容,如果感兴趣可官网看看说明
       properties.setProperty("name", "engine");
       properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
       if (restoredOffsetState != null) {
           properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
      }
       properties.setProperty("include.schema.changes", "false");
       properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
       properties.setProperty("tombstones.on.delete", "false");
       if (engineInstanceName == null) {
           engineInstanceName = UUID.randomUUID().toString();
      }    
       properties.setProperty(
               FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);  
       properties.setProperty("database.history", determineDatabase().getCanonicalName());
       String dbzHeartbeatPrefix =
               properties.getProperty(
                       Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
                       Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
       this.debeziumChangeFetcher =
               new DebeziumChangeFetcher<>(
                       sourceContext,
                       deserializer,
                       restoredOffsetState == null, // 是否是快照阶段或者state==null?
                       dbzHeartbeatPrefix,
                       handover);

       // 创建并配置engine相关参数
       this.engine =
               DebeziumEngine.create(Connect.class)
                      .using(properties)// 参数
                      .notifying(changeConsumer) // 配饰consumer消费 engine读取的数据(binlog/历史数据)
                      .using(OffsetCommitPolicy.always()) // offset的提交策略
                      .using(
                              (success, message, error) -> {
                                   if (success) {
                                       handover.close();
                                  } else {
                                       handover.reportError(error);
                                  }
                              })
                      .build();

       // 将engine任务提交到线程池中执行
       executor.execute(engine);
       debeziumStarted = true;

       // mertic相关配置i
       MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
       // ....
     
       // 启动fetcher,循环去hanover中拿取最新数据发送下游
       debeziumChangeFetcher.runFetchLoop();
  }
   
}

上面我们已经看了source.run的基本实现,他的主要处理逻辑在DebeziumChangeConsumer,DebeziumChangeFetcher,Handover中

简单介绍三个类的作用和 主要 方法参数

DebeziumChangeConsumer : 用于消费engine读取的数据

代码语言:javascript
复制
/* 该类实现 DebeziumEngine.ChangeConsumer接口,实现handlerBatch方法 相对比较简单, 另外两个成员方法主要是offset相关,非重点内容*/
// engine线程会调用handleBatch方法出传递引擎消费到的数据
public class DebeziumChangeConsumer
   implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
@Override
public void handleBatch(
   List<ChangeEvent<SourceRecord, SourceRecord>> events,
   RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
try {
   currentCommitter = recordCommitter;
  // 间接调用到handover的produce方法,该方法是阻塞的 嘻嘻嘻(如果有历史records未被消费则wait)
   handover.produce(events);
} catch (Throwable e) {
   // Hold this exception in handover and trigger the fetcher to exit
   handover.reportError(e);
      }
 }
} 

DebeziumChangeFetcher : 循环从handover中获取consumer从engine读取的最新数据

代码语言:javascript
复制
public class DebeziumChangeFetcher<T> {

private final SourceFunction.SourceContext<T> sourceContext;
/* 保证数据发送和状态更新的一把锁 */
private final Object checkpointLock;
/* 用于将数据转化成我们自定义的类型,如json,string等 */
private final DebeziumDeserializationSchema<T> deserialization;

/* 下面自定义的collector*/
private final DebeziumCollector debeziumCollector;
 /* 见名知意,很好理解 */
private final DebeziumOffset debeziumOffset;
/* 用于存储在stateoffset的序列化器*/
private final DebeziumOffsetSerializer stateSerializer;
/* 心跳相关*/
private final String heartbeatTopicPrefix;
/* 是否恢复的状态,需要消费历史相关数据*/
private boolean isInDbSnapshotPhase;
private final Handover handover;

public void runFetchLoop() throws Exception {
  try {
      // 读取mysql历史的数据,不要被名字所迷惑
      if (isInDbSnapshotPhase) {
          List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext();
             
             synchronized (checkpointLock) {
                 LOG.info(
                         "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");              
                 handleBatch(events);
                 // 这里防止snapshot数据无法一次读取完毕,必须保证snapshot数据读取完毕才进入binlog的读取
                 while (isRunning && isInDbSnapshotPhase) {
                     handleBatch(handover.pollNext());
                 }
             }
             LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
             }

         // 到这里表示snapshot的数据读取完毕,开始实时读取binlog数据
         while (isRunning) {
             // 具体的处理数据逻辑   pollNext会阻塞
             handleBatch(handover.pollNext());
         }
         } catch (Handover.ClosedException e) {
         // ignore
         }


 private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents)
        throws Exception {
    if (CollectionUtils.isEmpty(changeEvents)) {
        return;
    }
    this.processTime = System.currentTimeMillis();

     for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
         SourceRecord record = event.value();
         // time相关基本都是metric相关内容,不必较真
         updateMessageTimestamp(record);
         fetchDelay = processTime - messageTimestamp;

         // 通过心跳机制来更新offset
         if (isHeartbeatEvent(record)) {
             synchronized (checkpointLock) {
                 debeziumOffset.setSourcePartition(record.sourcePartition());
                 debeziumOffset.setSourceOffset(record.sourceOffset());
             }
             continue;
         }
               // 根据不同的deserialization对数据做转换,---> 可以看这个,比较容易理解StringDebeziumDeserializationSchema, 内部直接 record.toString即可,就是将debezium读取的record转换成我们想要的格式或者类型,debeziumCollector 就是下面自定义的collector,在deserialize中,会将转换完成的数据放入queue中
         deserialization.deserialize(record, debeziumCollector);

       // 判断数据是否为snapshot的最后一条数据,如果是则在这条数据之后转换到binlog的streaming流程
         if (!isSnapshotRecord(record)) {
             LOG.debug("Snapshot phase finishes.");
             isInDbSnapshotPhase = false;// runFetchLoop方法中使用
         }
         // 具体发送数据
         emitRecordsUnderCheckpointLock(
                 debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
       }
       }

 private void emitRecordsUnderCheckpointLock(
        Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {

     // 同步是保证数据的发送和offset的更新是安全,lock是可重入的(不懂可以百度,java基础内容)
     synchronized (checkpointLock) {
         T record;
         // 循环debeziumCollector的records队列,将队列中的数据依次发送到下游,
         while ((record = records.poll()) != null) {
             emitDelay = System.currentTimeMillis() - messageTimestamp;
             // 通过source的context对象将其发送到下游operator,这里转入了flink的处理逻辑,不再cdc代码之内
             sourceContext.collect(record);
         }
         debeziumOffset.setSourcePartition(sourcePartition);
         debeziumOffset.setSourceOffset(sourceOffset);
     }
     }

   // 心跳机制 ,用于更新offset的机制
   private boolean isHeartbeatEvent(SourceRecord record) {
     String topic = record.topic();
     return topic != null && topic.startsWith(heartbeatTopicPrefix);
   }

 // --------------------------------自定义collector-------------------------------------------------------

 private class DebeziumCollector implements Collector<T> {
   private final Queue<T> records = new ArrayDeque<>();
   @Override
   public void collect(T record) {
     // 将数据放入队列,queue会在别的地方进出列将数据发送下游
       records.add(record);
   }
 }
 }

Handover : source线程和engine线程执行中数据交互桥梁

代码语言:javascript
复制
/* 这个类由两个线程访问, pollNext由debeziumFetcher调用,produce有debeziumConsumer调用,因为涉及多线程的调用,单纯的讲代码可能不容易理解,可以去复习一下java多线程知识内容,或者自己debug一下看看调用流程就比较容易理解了 */
@ThreadSafe //表示类是线程安全的,这类涉及engine和source线程两个线程操作,内部的实现保证了线程安全
public class Handover implements Closeable {

  private static final Logger LOG = LoggerFactory.getLogger(Handover.class);
  private final Object lock = new Object();

  @GuardedBy("lock") // 注解表示该变量受lock的保护, 不是重点勿关注
  private List<ChangeEvent<SourceRecord, SourceRecord>> next;

  @GuardedBy("lock")
  private Throwable error;

  private boolean wakeupProducer;

  /* debeziumFetcher 调用,当没有数据的时候进入wait状态,wait状态的时候cpu是不会调用wait状态的线程,另一个线程就可以占用cpu的全部时间片*/
  public List<ChangeEvent<SourceRecord, SourceRecord>> pollNext() throws Exception {
     // 同步代码块才可以使用wait和notifyAll,为什么使用这种方式,因为只有两个线程,所以这种方式实现简单,如果线程多可以通过juc的lock去做或者其他方式也可以
      synchronized (lock) {
          // 没有数据没有异常则持续循环进入wait状态,为了防止虚假唤醒的情况
          while (next == null && error == null) {
              lock.wait();
          }
          List<ChangeEvent<SourceRecord, SourceRecord>> n = next;
          // 上面的循环可以退出的时候,说明一定是有数据或者有异常,不存在其他的情况
          if (n != null) {
            // 将next置为null 下面会根据此条件作为判断条件
              next = null;
            // 唤醒其他等待线程,当然只可能是engine线程
              lock.notifyAll();
              return n;
          } else {
            // 将异常抛出
              ExceptionUtils.rethrowException(error, error.getMessage());
         // 上面方法一定会抛出异常,改代码只是为了去掉编译警告...
              return Collections.emptyList();
          }
      }
  }

  public void produce(final List<ChangeEvent<SourceRecord, SourceRecord>> element)
          throws InterruptedException {

      checkNotNull(element);

      synchronized (lock) {
        // next不等一直进入wait状态
          while (next != null && !wakeupProducer) {
              lock.wait();
          }

          wakeupProducer = false;

          // 有异常抛出异常,没异常将接受新数据,并唤醒fetcher线程
          if (error != null) {
              ExceptionUtils.rethrow(error, error.getMessage());
          } else {
              next = element;
              lock.notifyAll();
          }
      }
  }
}

上面代码即是基于RichSourceFunction实现的cdc主要代码,其实不算难,但是前人写的代码是已经把很多问题已经考虑进入,对代码的抽象也很好,扩展起来很方便,api设计对与我们开发者来说很容易使用

三.mysql-cdc源码-新Source接口的实现

1.11版本之后flink提供了新的source接口,可以提前预习一波https://issues.apache.org/jira/browse/FLINK-10740

简单介绍一下 SourceReader : 对split的的数据进行读取操作,比如: 读取一个分区,一个块等,当然不只局限与一个分区,根据自己的实现来 SplitEnumerator : 负责对数据源进行切分或者发现分区等,比如: 发现kafka的分区,对文件划分块等 上述的比较简单,实际上比这复杂一点,所以在新的source接口实现一个source是比较难的事情,不过熟悉之后都一样 提前说明 : 一个split我们可以认为是一个切片,mysql-cdc中, 假想情况下 : 一张的一部分中, 比如 开始主键 1 到 结束主键 10 ,那么该split就表示这些数据,在具体读取数据的时候是有readTask来去读,那么他就会通过split标记的点位来进行数据的读取,当然一个readTask不止会执行一个split; snapshot表示的是读取数据库的历史全量数据 binlog 表示当我们snapshot阶段结束后开始binlog阶段,即我们开始读取的binlog数据了 先执行snapshot阶段,后执行binlog阶段

代码的生成和旧版是相同的,只不过是内部执行的逻辑存在变化,新的source接口实现的cdc代码比较复杂,涉及的内容比较多,可能比较晕,后面自己可以根据源码debug走一走 由于代码过多,主要讲解重点的内容,不重要的跳过了

代码语言:javascript
复制
// 实现了两个接口 source,和 resultTypeQueryable(比较简单就一个获取结果类型信息的接口) , 主要代码还是在source接口的实现
// T 为输出类型,MySqlSplit是mysql的分割器,PendingSplitsState表示Enumerator的状态对象
public class MySqlSource<T>
       implements Source<T, MySqlSplit, PendingSplitsState>, ResultTypeQueryable<T> {
 

   private final MySqlSourceConfigFactory configFactory;
   private final DebeziumDeserializationSchema<T> deserializationSchema;

   /* 通过构造者模式构建source所需要的参数,简单说明一下,里面的参数,通过MySqlSourceConfigFactory添加参数,在build方法中,将factory作为参数构建出MySqlSource
    -------------------------------------讲解一下对应关系------------------------------------------------
  MySqlSourceConfigFactory 可以根据不同的subtask创建对应的MySqlSourceConfig
  MySqlSourceConfig 可以构建 MySqlConnectorConfig
  MySqlConnection 通过 DebeziumUtil.createMySqlConnection(mySqlSourceConfig.getDbzConfiguration())方法构建
     上面的一个config比较混乱,名字也比较不容易理解,后面用到的时候会简单提一下,这里主要是有一个印象,不要被一些配置搞混
   */
   public static <T> MySqlSourceBuilder<T> builder() {
       return new MySqlSourceBuilder<>();
  }

 // 由MySqlSourceBuilder.build方法创建
   MySqlSource(
           MySqlSourceConfigFactory configFactory,
           DebeziumDeserializationSchema<T> deserializationSchema // 与老版source的deserialization一样
  ) {
       this.configFactory = configFactory;
       this.deserializationSchema = deserializationSchema;
  }

   @Override  // 流批一体的source,表示有界性,新source接口的特性
   public Boundedness getBoundedness() {return Boundedness.CONTINUOUS_UNBOUNDED; }

  /*构建sourceReader */
   @Override
   public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
           throws Exception {
    // 前面提到了,根据subtask索引创建对应的config
       MySqlSourceConfig sourceConfig =
               configFactory.createConfig(readerContext.getIndexOfSubtask());
       // 一个阻塞队列,多线程交互用的,不必深入
       FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
               new FutureCompletingBlockingQueue<>();
    // metric相关
       final MySqlSourceReaderMetrics sourceReaderMetrics =
               new MySqlSourceReaderMetrics(readerContext.metricGroup());
       sourceReaderMetrics.registerMetrics();
    // 通过supplier函数构建一个SplitReader,解耦的作用,主要看里面的MySqlSplitReader实现即可
       Supplier<MySqlSplitReader> splitReaderSupplier =
        // 拿到每个reader的config和对应的subtask index
              () -> new MySqlSplitReader(sourceConfig, readerContext.getIndexOfSubtask());
   
    // 构建了一个具体的sourceReader
       return new MySqlSourceReader<>(
               elementsQueue,
               splitReaderSupplier,
               new MySqlRecordEmitter<>(
                       deserializationSchema,
                       sourceReaderMetrics,
                       sourceConfig.isIncludeSchemaChanges()),
               readerContext.getConfiguration(),
               readerContext,
               sourceConfig);
  }

   @Override
   public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
           SplitEnumeratorContext<MySqlSplit> enumContext) {
    // 因为只会生成一次所以生成一个sourceConfig即可
       MySqlSourceConfig sourceConfig = configFactory.createConfig(0);
// 检验mysql
       final MySqlValidator validator = new MySqlValidator(sourceConfig);
       validator.validate();

       final MySqlSplitAssigner splitAssigner;
    // 判断开始条件如果是initial则先读取mysql table的数据(代码中叫做snapshot),然后再继续读取binlog的数据,如果不是initial状态,则直接从binlog开始读取
       if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) {
           try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
               final List<TableId> remainingTables = discoverCapturedTables(jdbc, sourceConfig);
               boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
               splitAssigner =
                // 里面包含 snapshot和binlog的split逻辑
                       new MySqlHybridSplitAssigner(
                               sourceConfig,
                               enumContext.currentParallelism(),
                               remainingTables,
                               isTableIdCaseSensitive);
          } catch (Exception e) {
               throw new FlinkRuntimeException(
                       "Failed to discover captured tables for enumerator", e);
          }
      } else {
        // 之有binlog的split逻辑
           splitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
      }
// 创建对应发的SplitEnumerator,用于构建split给reader读取
       return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
  }

// 恢复SplitEnumerato,比如任务故障重启,会根据不同的checkpoint恢复SplitEnumerator,用于继续之前未完成的读取操作
   @Override
   public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator(
           SplitEnumeratorContext<MySqlSplit> enumContext, PendingSplitsState checkpoint) {
       MySqlSourceConfig sourceConfig = configFactory.createConfig(0);

       final MySqlSplitAssigner splitAssigner;
       if (checkpoint instanceof HybridPendingSplitsState) {
           splitAssigner =
                   new MySqlHybridSplitAssigner(
                           sourceConfig,
                           enumContext.currentParallelism(),
                          (HybridPendingSplitsState) checkpoint);
      } else if (checkpoint instanceof BinlogPendingSplitsState) {
           splitAssigner =
                   new MySqlBinlogSplitAssigner( sourceConfig, (BinlogPendingSplitsState) checkpoint);
      } else {
           throw new UnsupportedOperationException( "Unsupported restored PendingSplitsState: " + checkpoint);
      }
       return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
  }

 // -----------------容错相关,不是重点-----------------
   @Override
   public SimpleVersionedSerializer<MySqlSplit> getSplitSerializer() { return MySqlSplitSerializer.INSTANCE; }

   @Override
   public SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer() { return new PendingSplitsStateSerializer(getSplitSerializer());}

// 返回值类型的提取
   @Override
   public TypeInformation<T> getProducedType() {return deserializationSchema.getProducedType();}
}

上面的代码中我们可以看到source的实现,主要是构建sourceReader和splitEnumerator,以及容错内容,相关的处理逻辑也封装在相应的对象中,下面我们对其内部逐步剖析

代码语言:javascript
复制
/*在看其他内容之前,我们可以看看如何对mysql进行split操作,在snapshot是通过主键来split的,binlog的只从当前offset位置开始消费,
这里是混合的一个split,另外还存在binlog和snapshot的splitAssigner,不过我们根据主要看看大致逻辑,具体到某一直可以自己阅读理解,
解释一下 : 先读取mysql历史数据即snapshot阶段,然后再进行当前mysql-binlog的位置开始消费,所以这个混合的意义就是先读取全量数据,然后从最新的binlog开始读取,完成cdc读取数据的过程*/
public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {

    private final int splitMetaGroupSize;

    private boolean isBinlogSplitAssigned;

    private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;

    public MySqlHybridSplitAssigner(
            MySqlSourceConfig sourceConfig,
            int currentParallelism,
            List<TableId> remainingTables,
            boolean isTableIdCaseSensitive) {
        this(
             // 创建snapshot split
                new MySqlSnapshotSplitAssigner(
                        sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive),
                false,
                sourceConfig.getSplitMetaGroupSize());
    }

    public MySqlHybridSplitAssigner(
            MySqlSourceConfig sourceConfig,
            int currentParallelism,
            HybridPendingSplitsState checkpoint) {
        this(
                new MySqlSnapshotSplitAssigner(
                        sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()),
                checkpoint.isBinlogSplitAssigned(),
                sourceConfig.getSplitMetaGroupSize());
    }

    private MySqlHybridSplitAssigner(
            MySqlSnapshotSplitAssigner snapshotSplitAssigner,
            boolean isBinlogSplitAssigned,
            int splitMetaGroupSize) {
        this.snapshotSplitAssigner = snapshotSplitAssigner;
        this.isBinlogSplitAssigned = isBinlogSplitAssigned;
        this.splitMetaGroupSize = splitMetaGroupSize;
    }

    @Override
    public void open() {
        snapshotSplitAssigner.open();
    }
  
  // 主要返回下一个split,没有则返回一个空, optional可以jdk8的新特性,用于解决空指针的一个类
    @Override
    public Optional<MySqlSplit> getNext() {
      // 下面的方法可以见名知意,自行理解即可
        if (snapshotSplitAssigner.noMoreSplits()) {
            if (isBinlogSplitAssigned) {
                return Optional.empty();
            } else if (snapshotSplitAssigner.isFinished()) { // 当snapshot完成后,开始binlog的split流程
                // we need to wait snapshot-assigner to be finished before
                // assigning the binlog split. Otherwise, records emitted from binlog split
                // might be out-of-order in terms of same primary key with snapshot splits.
                isBinlogSplitAssigned = true;
                return Optional.of(createBinlogSplit());
            } else {
                // binlog split is not ready by now
                return Optional.empty();
            }
        } else {
            // snapshot assigner still have remaining splits, assign split from it
            return snapshotSplitAssigner.getNext();
        }
    }
  
   // splitAssigner是否在等待已完成split回调,即onFinishedSplits
    @Override
    public boolean waitingForFinishedSplits() {
        return snapshotSplitAssigner.waitingForFinishedSplits();
    }
  // 获取已完成的split并且包含他的元数据,可以根据已经完成snapshot(snapshot的某一个split)生成对应binlog的split
    @Override
    public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
        return snapshotSplitAssigner.getFinishedSplitInfos();
    }
  // 使用已完成的binlog偏移量来处理已完成的split,用于确定何时生成binlog split以及生成什么binlog split,就是回调
    @Override
    public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
        snapshotSplitAssigner.onFinishedSplits(splitFinishedOffsets);
    }

   // 向此splitAssigner添加一组spilt,当某些split处理失败,则需要重新添加分割时调用此方法
    @Override
    public void addSplits(Collection<MySqlSplit> splits) {
        List<MySqlSplit> snapshotSplits = new ArrayList<>();
        for (MySqlSplit split : splits) {
            if (split.isSnapshotSplit()) {
                snapshotSplits.add(split);
            } else {
                // we don't store the split, but will re-create binlog split later
                isBinlogSplitAssigned = false;
            }
        }
        snapshotSplitAssigner.addSplits(snapshotSplits);
    }
 // ----------------------------checkpoint 容错相关----------------------------------------
    @Override
    public PendingSplitsState snapshotState(long checkpointId) {
        return new HybridPendingSplitsState(
                snapshotSplitAssigner.snapshotState(checkpointId), isBinlogSplitAssigned);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void close() {
        snapshotSplitAssigner.close();
    }

    // ------------------------------------binlog split部分-------------------------------------------
  // 构建biglog split, 就是根据已经完成snapshot split来构建binlog split的一个过程,split代码比较简单可以自行阅读
   // 简单介绍一下 就是描述binlog的split,snapshot的split相关内容,比如snapshot,会按照主键去做split,已经table的schemas相关信息
    private MySqlBinlogSplit createBinlogSplit() {
        final List<MySqlSnapshotSplit> assignedSnapshotSplit =
                snapshotSplitAssigner.getAssignedSplits().values().stream()
                        .sorted(Comparator.comparing(MySqlSplit::splitId))
                        .collect(Collectors.toList());

        Map<String, BinlogOffset> splitFinishedOffsets =
                snapshotSplitAssigner.getSplitFinishedOffsets();
        final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();

        BinlogOffset minBinlogOffset = null;
        for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
            // find the min binlog offset
            BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
            if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
                minBinlogOffset = binlogOffset;
            }
            finishedSnapshotSplitInfos.add(
                    new FinishedSnapshotSplitInfo(
                            split.getTableId(),
                            split.splitId(),
                            split.getSplitStart(),
                            split.getSplitEnd(),
                            binlogOffset));
        }

        boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
        return new MySqlBinlogSplit(
                BINLOG_SPLIT_ID,
                minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
                BinlogOffset.NO_STOPPING_OFFSET,
                divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
                new HashMap<>(),
                finishedSnapshotSplitInfos.size());
    }
}

现在我们开始介绍sourceReader和SplitEnumerator

sourceReader :

代码语言:javascript
复制

/* SingleThreadMultiplexSourceReaderBase */
public class MySqlSourceReader<T>
     extends SingleThreadMultiplexSourceReaderBase<SourceRecord, T, MySqlSplit, MySqlSplitState> {

 private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class);

 private final MySqlSourceConfig sourceConfig;
 private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
 private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
 private final int subtaskId;

 public MySqlSourceReader(
         FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue,
         Supplier<MySqlSplitReader> splitReaderSupplier,
         RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter,
         Configuration config,
         SourceReaderContext context,
         MySqlSourceConfig sourceConfig) {
     super(
             elementQueue,
          // 一个单线程的fetcher管理器,做一些读取操作
          // 简单描述一下流程 
          // SingleThreadFetcherManager.createSplitFetcher 构建一个SplitFetcher(实现了Runnable),在SplitFetcher中会构建一个fetcherTask,SplitFetcher.run方法中,循环调用this.runOnce(),this.runOnce()会持续调用fetcherTask.run()读取数据,run()会调用MySqlSplitReader.fetch方法,返回reader读取的数据,并将数据放入到elementQueue中,只要涉及都多线程的代码,都比较晦涩难懂
             new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier::get),
             recordEmitter,
             config,
             context);
     this.sourceConfig = sourceConfig;
     this.finishedUnackedSplits = new HashMap<>();
     this.uncompletedBinlogSplits = new HashMap<>();
     this.subtaskId = context.getIndexOfSubtask();
 }
  // 启动reader
 @Override
 public void start() {
     if (getNumberOfCurrentlyAssignedSplits() == 0) {
        // 发送split的请求到splitEnumerator,会调用到SplitEnumerator.handleSplitRequest(int, String)方法,会带这并行的reader的subtask id 和hostname
         context.sendSplitRequest();
     }
 }
  // 当reader分配到新的split的时候,会初始化一个split的state
 @Override
 protected MySqlSplitState initializedState(MySqlSplit split) {
     if (split.isSnapshotSplit()) {
         return new MySqlSnapshotSplitState(split.asSnapshotSplit());
     } else {
         return new MySqlBinlogSplitState(split.asBinlogSplit());
     }
 }

 @Override // 容错相关, skip
 public List<MySqlSplit> snapshotState(long checkpointId) {
     // unfinished splits
     List<MySqlSplit> stateSplits = super.snapshotState(checkpointId);
     // add finished snapshot splits that didn't receive ack yet
     stateSplits.addAll(finishedUnackedSplits.values());
     // add binlog splits who are uncompleted
     stateSplits.addAll(uncompletedBinlogSplits.values());
     return stateSplits;
 }
  // 清理处理已完成的split状态,非重点
 @Override
 protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
     for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
         MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
         checkState(
                 mySqlSplit.isSnapshotSplit(),
                 String.format(
                         "Only snapshot split could finish, but the actual split is binlog split %s",
                         mySqlSplit));
         finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
     }
     reportFinishedSnapshotSplitsIfNeed();
     context.sendSplitRequest();
 }
  /*添加此reader要read的split列表,当splitEnumerator通过splitEnumeratorContext分配一个splut时,将调用此方法
即调用 context.assignSplit(SourceSplit, int) 或者 context.assignSplits(SplitsAssignment).
 */
 @Override
 public void addSplits(List<MySqlSplit> splits) {
     List<MySqlSplit> unfinishedSplits = new ArrayList<>();
     for (MySqlSplit split : splits) {
       // 判断是否是snapshot还是binlog split
         if (split.isSnapshotSplit()) {
            // 如果split已经read完成放入完成集合,否则放入未完成的集合中
             MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
             if (snapshotSplit.isSnapshotReadFinished()) {
                 finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
             } else {
                 unfinishedSplits.add(split);
             }
         } else {
             if (!split.asBinlogSplit().isCompletedSplit()) {
                //如果binlog split未完成则加入未完成的列表中,并想spluitEnumerator发送请求binlog split meta的事件
                 uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());
                 requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
             } else {
                // 未完成的split集合删除该split ,未完成的集合表示没有split meta信息
                 uncompletedBinlogSplits.remove(split.splitId());
                // 创建binlog split, 带有table schema信息
                 MySqlBinlogSplit mySqlBinlogSplit =
                         discoverTableSchemasForBinlogSplit(split.asBinlogSplit());
                // 添加到未完成的splits,后续会进行read操作
                 unfinishedSplits.add(mySqlBinlogSplit);
             }
         }
     }
     // notify split enumerator again about the finished unacked snapshot splits
     reportFinishedSnapshotSplitsIfNeed();
     // add all un-finished splits (including binlog split) to SourceReaderBase
     // TODO 当调用spuer.addSplits的时候,会启动fetcherManager,开始读取数据的操作
     super.addSplits(unfinishedSplits);
 }

 private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
     final String splitId = split.splitId();
   // 如果tableSchema不存在则填充,如果已经存在,则直接返回split即可
     if (split.getTableSchemas().isEmpty()) {
         try (MySqlConnection jdbc =
              // 静态方法,构建一个mysqlConnection,可以认为就是一个jdbc连接 ,不必深入
                 DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
             Map<TableId, TableChanges.TableChange> tableSchemas =
                // 静态方法,根据我们sourceBuilder构建的时候给定的database和tablelist来构建对应的tableId和TableChange,然后我们在面read的时候需要, 不必深入工具类
              TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
             LOG.info("The table schema discovery for binlog split {} success", splitId);
            // 静态方法,构建一个带有tableSchema的MysqlBinlogSpilt,不必深入
             return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
         } catch (SQLException e) {
             LOG.error("Failed to obtains table schemas due to {}", e.getMessage());
             throw new FlinkRuntimeException(e);
         }
     } else {

         LOG.warn("The binlog split {} has table schemas yet, skip the table schema discovery",split);
         return split;
     }
 }

// 处理source自定义事件,接收来自splitEumumerator,与splitEumumerator类似
 @Override
 public void handleSourceEvents(SourceEvent sourceEvent) {
     if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
         FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
         LOG.debug(
                 "The subtask {} receives ack event for {} from enumerator.",
                 subtaskId,
                 ackEvent.getFinishedSplits());
         for (String splitId : ackEvent.getFinishedSplits()) {
             this.finishedUnackedSplits.remove(splitId);
         }
     } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
         // report finished snapshot splits
         LOG.debug(
                 "The subtask {} receives request to report finished snapshot splits.",
                 subtaskId);
         reportFinishedSnapshotSplitsIfNeed();
     } else if (sourceEvent instanceof BinlogSplitMetaEvent) {
         LOG.debug(
                 "The subtask {} receives binlog meta with group id {}.",
                 subtaskId,
                 ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
         fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
     } else {
         super.handleSourceEvents(sourceEvent);
     }
 }
   // 发送请求binlogSplit meta的事件
 private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit binlogSplit) {
     final String splitId = binlogSplit.splitId();
     if (!binlogSplit.isCompletedSplit()) {
         final int nextMetaGroupId =
                 getNextMetaGroupId(
                         binlogSplit.getFinishedSnapshotSplitInfos().size(),
                         sourceConfig.getSplitMetaGroupSize());
         BinlogSplitMetaRequestEvent splitMetaRequestEvent =
                 new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId);
         context.sendSourceEventToCoordinator(splitMetaRequestEvent);
     } else {
         LOG.info("The meta of binlog split {} has been collected success", splitId);
         this.addSplits(Arrays.asList(binlogSplit));
     }
 }
  // 我们发送了请求meta的event后,会收到binlog split meta,我们需要填充至binlogSplit中
 private void fillMetaDataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {
     MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId());
     if (binlogSplit != null) {
         final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
         final int expectedMetaGroupId =
                 getNextMetaGroupId(
                         binlogSplit.getFinishedSnapshotSplitInfos().size(),
                         sourceConfig.getSplitMetaGroupSize());
         if (receivedMetaGroupId == expectedMetaGroupId) {
             List<FinishedSnapshotSplitInfo> metaDataGroup =
                     metadataEvent.getMetaGroup().stream()
                             .map(FinishedSnapshotSplitInfo::deserialize)
                             .collect(Collectors.toList());
             uncompletedBinlogSplits.put(
                     binlogSplit.splitId(),
                     MySqlBinlogSplit.appendFinishedSplitInfos(binlogSplit, metaDataGroup));
             LOG.info("Fill meta data of group {} to binlog split", metaDataGroup.size());
         } else {
             LOG.warn("Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it",metadataEvent.getSplitId(), receivedMetaGroupId,expectedMetaGroupId);
         }
       // 继续发送请求meta event
         requestBinlogSplitMetaIfNeeded(binlogSplit);
     } else {
         LOG.warn( "Received binlog meta event for split {}, but the uncompleted split map does not contain it", metadataEvent.getSplitId());
     }
 }
   // state变成不可变的state
 @Override
 protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { return splitState.toMySqlSplit(); }
}

splitEnumerator :

  • 处理sourceReader的split请求
  • 将split分配给sourceReader
代码语言:javascript
复制
// 继承SplitEnumerator,并重写其方法
public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, PendingSplitsState> {
    private static final long CHECK_EVENT_INTERVAL = 30_000L;
    private final SplitEnumeratorContext<MySqlSplit> context;
    private final MySqlSourceConfig sourceConfig;
    private final MySqlSplitAssigner splitAssigner;
    // using TreeSet to prefer assigning binlog split to task-0 for easier debug
    private final TreeSet<Integer> readersAwaitingSplit;
    private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta;

    public MySqlSourceEnumerator(
            SplitEnumeratorContext<MySqlSplit> context,
            MySqlSourceConfig sourceConfig,
            MySqlSplitAssigner splitAssigner) {
       // source.createEnumerator传入的context对象
        this.context = context;
        this.sourceConfig = sourceConfig;
        this.splitAssigner = splitAssigner;
        this.readersAwaitingSplit = new TreeSet<>();
    }

    @Override
    public void start() {
        splitAssigner.open(); //调用splitAssigner的open方法,可以具体看看每个splitAssigner的实现
        // 注册一个Callable,定期调用,主要的作用就是当reader出现通信失败或者故障重启之后,检查是否有错过的通知时间,不是终点
        this.context.callAsync(
                this::getRegisteredReader,
                this::syncWithReaders,
                CHECK_EVENT_INTERVAL,
                CHECK_EVENT_INTERVAL);
    }

    // 处理split的请求,当有具体给定子subtask id的reader调用SourceReaderContext.sendSplitRequest()方法时,将调用此方法。
    @Override
    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        if (!context.registeredReaders().containsKey(subtaskId)) {
            // reader failed between sending the request and now. skip this request.
            return;
        }
    // 将请求的taskId放入等待列表
        readersAwaitingSplit.add(subtaskId);
       // 对等待列表的subtask进行fen'pei
        assignSplits();
    }
  // 将split添加至splitEnumerator,只有在最后一个成功的checkpoint之后,分配的spilt才会出现此情况,说明需要重新处理.
    @Override
    public void addSplitsBack(List<MySqlSplit> splits, int subtaskId) {
        LOG.debug("MySQL Source Enumerator adds splits back: {}", splits);
        splitAssigner.addSplits(splits);
    }

    // 处理sourceReader的自定义event
    @Override
    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
       // sourceReader发送给splitEnumerator的SourceEvent通知snapshot的split已经读取完成,binlog的位置是一致的
        if (sourceEvent instanceof FinishedSnapshotSplitsReportEvent) {
            LOG.info(
                    "The enumerator receives finished split offsets {} from subtask {}.",
                    sourceEvent,
                    subtaskId);
            FinishedSnapshotSplitsReportEvent reportEvent =
                    (FinishedSnapshotSplitsReportEvent) sourceEvent;
            Map<String, BinlogOffset> finishedOffsets = reportEvent.getFinishedOffsets();
            // 上面splitAssigner介绍过
            splitAssigner.onFinishedSplits(finishedOffsets);
            // 返回ACK事件返回给redaer的表示已经确认了snapshot
            FinishedSnapshotSplitsAckEvent ackEvent =
                    new FinishedSnapshotSplitsAckEvent(new ArrayList<>(finishedOffsets.keySet()));
            context.sendEventToSourceReader(subtaskId, ackEvent);
        } 
       // sourceReader发送给splitEnumerator的SourceEvent用来拉取binlog元数据,也就是发送BinlogSplitMetaEvent
      else if (sourceEvent instanceof BinlogSplitMetaRequestEvent) {
            LOG.debug(
                    "The enumerator receives request for binlog split meta from subtask {}.",
                    subtaskId);
          // 发送binlog meta
            sendBinlogMeta(subtaskId, (BinlogSplitMetaRequestEvent) sourceEvent);
        }
    }

    @Override
    public PendingSplitsState snapshotState(long checkpointId) {
        return splitAssigner.snapshotState(checkpointId);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        splitAssigner.notifyCheckpointComplete(checkpointId);
        // binlog split may be available after checkpoint complete
        assignSplits();
    }

    // ------------------------------------------------------------------------------------------
  // 为等待列表的subtask分配
    private void assignSplits() {
      // treeSet返回的iter是排好序的,即按照subtask id顺序依次处理
        final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
        while (awaitingReader.hasNext()) {
            int nextAwaiting = awaitingReader.next();
            // 如果reader再次请求的split在此期间失败,则将其从等待列表中删除
            if (!context.registeredReaders().containsKey(nextAwaiting)) {
                awaitingReader.remove();
                continue;
            }
            Optional<MySqlSplit> split = splitAssigner.getNext();
            if (split.isPresent()) {
                final MySqlSplit mySqlSplit = split.get();
                // 为subtask分配split
                context.assignSplit(mySqlSplit, nextAwaiting);
                awaitingReader.remove();
                LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting);
            } else {
                // there is no available splits by now, skip assigning
               // 前面splitAssigner中会分配空值,在这里被过滤掉
                break;
            }
        }
    }

  // 发送给binlog meta event到reader
    private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEvent) {
        // 如果binlog meta ==null 则进行meta的初始化操作
        if (binlogSplitMeta == null) {
            final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos =
                    splitAssigner.getFinishedSplitInfos();
            if (finishedSnapshotSplitInfos.isEmpty()) {
                LOG.error(
                        "The assigner offer empty finished split information, this should not happen");
                throw new FlinkRuntimeException(
                        "The assigner offer empty finished split information, this should not happen");
            }
            binlogSplitMeta =
                    Lists.partition(
                            finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());
        }
        final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();

        if (binlogSplitMeta.size() > requestMetaGroupId) {
           // 获取对应的FinishedSnapshotSplitInfo列表,并将其封序列化,生成meta event
            List<FinishedSnapshotSplitInfo> metaToSend = binlogSplitMeta.get(requestMetaGroupId);
            BinlogSplitMetaEvent metadataEvent =
                    new BinlogSplitMetaEvent(
                            requestEvent.getSplitId(),
                            requestMetaGroupId,
                            metaToSend.stream()
                                    .map(FinishedSnapshotSplitInfo::serialize)
                                    .collect(Collectors.toList()));
           // 将生成的meta evnet 发送给reader
            context.sendEventToSourceReader(subTask, metadataEvent);
        } else {
            LOG.error(
                    "Received invalid request meta group id {}, the invalid meta group id range is [0, {}]",
                    requestMetaGroupId,
                    binlogSplitMeta.size() - 1);
        }
    }
}

上面两个类中我们没有看到具体的读数据逻辑,实际上当系统调用addSplit()的时候就开始启动任务了,由于调用链比较长,为了方便观看,我这里直接截图看代码,看看代码是怎么开始进入执行逻辑的,

  • sourceReader中创建的fetcherManager,存入父类成员变量中
  • 当sourceReader调用addSplits的会调用父类的addSplits方法
  • 调用我们传入的fetcherManager的addSplits方法
  • 调用fetcherManager的addSplits方法时,子类没有覆写父类方法,直接进入父类方法,这里直接进入父类的splits方法,如果fetcher没有启动,则创建fetcher(一个runnable对象),然后提交到线程池执行任务

上面可以看到我们的fetcher已经启动了,那我们就看看fetcher具体做了什么样子的事情(要记住上面传入了一个队列,fetcher中读取的数据会放入队列中),createFetcher时候,实际是创建的SplitFetcher,有flink新source中提供类

代码语言:javascript
复制
/*由于SplitFetcher是一个runnable对象,所以我们直接进入run方法看看做了什么即可
  先介绍一下流程 :
   1. 当构建fetcher的时候在构造方法中,我们传递了一个splitReader,这个是负责真实读取数据的(实际上是mysqlSplitReader)
   2. fetcher构造方法中构建了一个FetcherTask,run之后会开始task的执行,如果还记得的话 我们在startFetcher()之后调用了一个fetcher的addSplit方法,该方法会将splits构建成tasks加入的taskQueue
   3. 里面会有一些空闲,唤醒等不重要的逻辑,我给删除掉了,不重要,不要占用过多时间,因为非cdc内容
*/
   private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);
    private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");

    private final int id;
    private final BlockingDeque<SplitFetcherTask> taskQueue;
    // track the assigned splits so we can suspend the reader when there is no splits assigned.
    private final Map<String, SplitT> assignedSplits;
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final SplitReader<E, SplitT> splitReader;
    private final Consumer<Throwable> errorHandler;
    private final Runnable shutdownHook;
    private final AtomicBoolean wakeUp;
    private final AtomicBoolean closed;
    private final FetchTask<E, SplitT> fetchTask;
    private volatile SplitFetcherTask runningTask = null;

    private final Object lock = new Object();

 
    SplitFetcher(
            int id,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitReader<E, SplitT> splitReader,
            Consumer<Throwable> errorHandler,
            Runnable shutdownHook,
            Consumer<Collection<String>> splitFinishedHook) {
        this.id = id;
       // task队列,包含WAKEUP_TASK(特定情况下唤醒fetcher线程用),以及我们读取任务的task
        this.taskQueue = new LinkedBlockingDeque<>();
       // 读取的数据会放入该队列
        this.elementsQueue = elementsQueue;
        this.assignedSplits = new HashMap<>();
        this.splitReader = splitReader;
        this.errorHandler = errorHandler;
        this.shutdownHook = shutdownHook;
        this.isIdle = true;
        this.wakeUp = new AtomicBoolean(false);
        this.closed = new AtomicBoolean(false);
    // 对传入的splitReader封装到fetcherTask,以便任务启动的时候直接执行任务
        this.fetchTask =
                new FetchTask<>(
                        splitReader,
                        elementsQueue,
                        ids -> {
                            ids.forEach(assignedSplits::remove);
                            splitFinishedHook.accept(ids);
                            LOG.info("Finished reading from splits {}", ids);
                        },
                        id);
    }

    @Override
    public void run() {
        LOG.info("Starting split fetcher {}", id);
        try {
            while (!closed.get()) {
              // 每次循环的距离逻辑
                runOnce();
            }
        } catch (Throwable t) {
            errorHandler.accept(t);
        } finally {
            try {
                splitReader.close();
            } catch (Exception e) {
                errorHandler.accept(e);
            }
            LOG.info("Split fetcher {} exited.", id);
            // This executes after possible errorHandler.accept(t). If these operations bear
            // a happens-before relation, then we can checking side effect of errorHandler.accept(t)
            // to know whether it happened after observing side effect of shutdownHook.run().
            shutdownHook.run();
        }
    }

    /** Package private method to help unit test. */
    void runOnce() {
        try {
            if (shouldRunFetchTask()) {
                runningTask = fetchTask;
            } else {
                runningTask = taskQueue.take();
            }
            
            LOG.debug("Prepare to run {}", runningTask);
           // 这里运行task,我们下面直接去task中看看具体的操作逻辑即可
            if (!wakeUp.get() && runningTask.run()) {
                LOG.debug("Finished running task {}", runningTask);
                // the task has finished running. Set it to null so it won't be enqueued.
                runningTask = null;
                checkAndSetIdle();
            }
        } catch (Exception e) {
            throw new RuntimeException(
                    String.format(
                            "SplitFetcher thread %d received unexpected exception while polling the records",
                            id),
                    e);
        }
        // If the task is not null that means this task needs to be re-executed. This only
        // happens when the task is the fetching task or the task was interrupted.
        maybeEnqueueTask(runningTask);
        synchronized (wakeUp) {
            // Set the running task to null. It is necessary for the shutdown method to avoid
            // unnecessarily interrupt the running task.
            runningTask = null;
            // Set the wakeUp flag to false.
            wakeUp.set(false);
            LOG.debug("Cleaned wakeup flag.");
        }
    }

    /*  在fetcher创建的时候调用了该方法,或者已经运行之后调用的该方法在上面截图的流程中有代码   */
    public void addSplits(List<SplitT> splitsToAdd) {
        enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
        wakeUp(true);
    }
  public void enqueueTask(SplitFetcherTask task) {
        synchronized (lock) {
            taskQueue.offer(task);
            isIdle = false;
        }
    }
}

我们进入fetcherTask中,只看task逻辑即可

代码语言:javascript
复制
class FetcherTask{
 @Override
    public boolean run() throws IOException {
        try {
            if (!isWakenUp() && lastRecords == null) {
              // 返回的是该对象 public final class MySqlRecords implements RecordsWithSplitIds<SourceRecord>
              // 调用了我们在创建fetcherTask的时候传入的splitReader对象,实际上还是调用reader的fetch方法来真正的获取数据
                lastRecords = splitReader.fetch();
            }

            if (!isWakenUp()) {
                // The order matters here. We must first put the last records into the queue.
                // This ensures the handling of the fetched records is atomic to wakeup.
               // 将读取的数据放入到队列汇总
                if (elementsQueue.put(fetcherIndex, lastRecords)) {
                    if (!lastRecords.finishedSplits().isEmpty()) {
                        // The callback does not throw InterruptedException.
                        splitFinishedCallback.accept(lastRecords.finishedSplits());
                    }
                    lastRecords = null;
                }
            }
        } catch (InterruptedException e) {
            throw new IOException("Source fetch execution was interrupted", e);
            if (isWakenUp()) {
                wakeup = false;
            }
        }
        return true;
    }
}

通过上面我们基本上已经清楚了在flink层面是怎么最终调用了cdc读取数据的代码,现在我们根据主要的读取代码看看是怎么样子实现的

currentReader.pollSplitRecords() ,我们简单介绍一下currentReader(BinlogSplitReader/SnapshotSplitReader)主要两种实现,大概的思路这里面会根据不同的性质区分进行读取数据,在submitSplit的时候会创建readRask读取指定split的数据,结果会放入StatefulTaskContext的queue中,在fetch方法会先提交split,让其执行read数据,然后通过pollSplitRecords方法在调用queue.poll拉取数据,这是一个阻塞的操作,如果超时则抛出中断异常

代码语言:javascript
复制
public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
    private final Queue<MySqlSplit> splits;
    private final MySqlSourceConfig sourceConfig;
    private final int subtaskId;
    @Nullable private DebeziumReader<SourceRecord, MySqlSplit> currentReader;
    @Nullable private String currentSplitId;
  
    @Override
    public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
        // 执行fetch的时候提前检查一下currentReader,并根据不同的split创建不同的对应的reader,binlog/snapshot
        checkSplitOrStartNext();
        Iterator<SourceRecord> dataIt = null;
        try {
           // 调用具体的debeziumReader执行任务
           // 在reader中会调用StatefulTaskContext的queue的poll方法拉取数据,该方法会阻塞(也可以根据时间阻塞),如果时间间隔内没有返回数据则被中断,抛出InterruptedException
            dataIt = currentReader.pollSplitRecords(); 
        } catch (InterruptedException e) {
            LOG.warn("fetch data failed.", e);
            throw new IOException(e);
        }
        return dataIt == null
                ? finishedSnapshotSplit() // 如果没有读取到数据则返回一个空的,该方法执行后会将currentSplitId置位null,表示已经该split执行完成
                : MySqlRecords.forRecords(currentSplitId, dataIt);
    }

    @Override
    public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(
                    String.format(
                            "The SplitChange type of %s is not supported.",
                            splitsChanges.getClass()));
        }
        LOG.debug("Handling split change {}", splitsChanges);
        splits.addAll(splitsChanges.splits());
    }

 
    private void checkSplitOrStartNext() throws IOException {
        // the binlog reader should keep alive
        if (currentReader instanceof BinlogSplitReader) {
            return;
        }

        if (canAssignNextSplit()) {
            final MySqlSplit nextSplit = splits.poll();
            if (nextSplit == null) {
                throw new IOException("Cannot fetch from another split - no split remaining");
            }
            currentSplitId = nextSplit.splitId();

            if (nextSplit.isSnapshotSplit()) {
                if (currentReader == null) {
                    final MySqlConnection jdbcConnection =
                            createMySqlConnection(sourceConfig.getDbzConfiguration());
                    final BinaryLogClient binaryLogClient =
                            createBinaryClient(sourceConfig.getDbzConfiguration());
                    final StatefulTaskContext statefulTaskContext =
                            new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
                    currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
                }
            } else {
                // point from snapshot split to binlog split
                if (currentReader != null) {
                    LOG.info("It's turn to read binlog split, close current snapshot reader");
                    currentReader.close();
                }
                final MySqlConnection jdbcConnection =
                        createMySqlConnection(sourceConfig.getDbzConfiguration());
                final BinaryLogClient binaryLogClient =
                        createBinaryClient(sourceConfig.getDbzConfiguration());
                final StatefulTaskContext statefulTaskContext =
                        new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
                currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
                LOG.info("BinlogSplitReader is created.");
            }
           // 提交一个split到reader,reader会在在submitSplit方法创建ReadTask对象,进行读取数据,将数据放入StatefulTaskContext的queue中,readTask放入线程池执行任务
            currentReader.submitSplit(nextSplit);
        }
    }
    private boolean canAssignNextSplit() {
        return currentReader == null || currentReader.isFinished();
    }
}

上面基本运行流程已经走通了现在就差实际读取数据的阶段了,现在我们直接跟着代码一点一点走 看看实际的执行逻辑是什么样子的,怎么读取数据的,由于cdc的代码比较多,我们就过滤掉了,binlogReadTask太繁琐,我们就不一步一步讲解了,后面可以简单介绍一下流程看看

我们一次看看对于reader对应的处理逻辑

1.checkSplitWithSplitIds方法

在这个方法中最主要的是调用了submitSplit开始我们下面的读取数据的一个流程

代码语言:javascript
复制

// -------------------------   SnapshotSplitReader.submitSplit方法  ------------------------------------------

public void submitSplit(MySqlSplit mySqlSplit) {
        this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
        statefulTaskContext.configure(currentSnapshotSplit);
     // 拿到context的queue,在pollSplitSrecords的时候需要
        this.queue = statefulTaskContext.getQueue();
        this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster();
        this.hasNextElement.set(true);
        this.reachEnd.set(false);
     // 主要读取逻辑在readTask中
        this.splitSnapshotReadTask =
                new MySqlSnapshotSplitReadTask(
                        statefulTaskContext.getConnectorConfig(),
                        statefulTaskContext.getOffsetContext(),
                        statefulTaskContext.getSnapshotChangeEventSourceMetrics(),
                        statefulTaskContext.getDatabaseSchema(),
                        statefulTaskContext.getConnection(),
                        statefulTaskContext.getDispatcher(),
                        statefulTaskContext.getTopicSelector(),
                        StatefulTaskContext.getClock(),
                        currentSnapshotSplit);
     // 提交一个runnable到线程中,主要是执行readTask的execute方法
        executor.submit(
                () -> {
                    try {
                        currentTaskRunning = true;
                       // 自己实现的contextImpl 主要记录高水位和低水位用
                        final SnapshotSplitChangeEventSourceContextImpl sourceContext =
                                new SnapshotSplitChangeEventSourceContextImpl();
                       // 执行readTask
                        SnapshotResult snapshotResult =
                                splitSnapshotReadTask.execute(sourceContext);

                        final MySqlBinlogSplit backfillBinlogSplit =
                                createBackfillBinlogSplit(sourceContext);
                        // optimization that skip the binlog read when the low watermark equals high
                        // watermark
                       // 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read
                        final boolean binlogBackfillRequired =
                                backfillBinlogSplit
                                        .getEndingOffset()
                                        .isAfter(backfillBinlogSplit.getStartingOffset());
                        if (!binlogBackfillRequired) {
                            dispatchHighWatermark(backfillBinlogSplit);
                            currentTaskRunning = false;
                            return;
                        }

                        // snapshot执行完成后,开始binlogReadTask的读取操作
                        if (snapshotResult.isCompletedOrSkipped()) {
                           // 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task
                            final MySqlBinlogSplitReadTask backfillBinlogReadTask =
                                    createBackfillBinlogReadTask(backfillBinlogSplit);
                           // 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了
                           // 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会
                           // 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游
                            backfillBinlogReadTask.execute(
                                    new SnapshotBinlogSplitChangeEventSourceContextImpl());
                        } else {
                            readException =
                                    new IllegalStateException(
                                            String.format(
                                                    "Read snapshot for mysql split %s fail",
                                                    currentSnapshotSplit));
                        }
                    } catch (Exception e) {
                        currentTaskRunning = false;
                        LOG.error(
                                String.format(
                                        "Execute snapshot read task for mysql split %s fail",
                                        currentSnapshotSplit),
                                e);
                        readException = e;
                    }
                });
    }


// -------------------------   MySqlSnapshotSplitReadTask.execute(sourceContext)方法  ------------------------------------------
 
 @Override
    public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
        SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个
        final SnapshotContext ctx;
        try {
            ctx = prepare(context); //重新new了一个 context对象,比较无用
        } catch (Exception e) {
            LOG.error("Failed to initialize snapshot context.", e);
            throw new RuntimeException(e);
        }
        try {
           // 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可
            return doExecute(context, ctx, snapshottingTask);
        } catch (InterruptedException e) {
            LOG.warn("Snapshot was interrupted before completion");
            throw e;
        } catch (Exception t) {
            throw new DebeziumException(t);
        }
    }

// -------------------------   MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法  ------------------------------------------

 @Override
    protected SnapshotResult doExecute(
            ChangeEventSourceContext context,
            SnapshotContext snapshotContext,
            SnapshottingTask snapshottingTask)
            throws Exception {
        final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
                (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
        ctx.offset = offsetContext;
       // 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了
        final SignalEventDispatcher signalEventDispatcher =
                new SignalEventDispatcher(
                        offsetContext.getPartition(),
                        topicSelector.topicNameFor(snapshotSplit.getTableId()),
                        dispatcher.getQueue());
    // 其实log输出的日志就已经很清晰了
       // 记录低水位
        final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 1 - Determining low watermark {} for split {}",
                lowWatermark,
                snapshotSplit);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setLowWatermark(lowWatermark);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);

        LOG.info("Snapshot step 2 - Snapshotting data");
       // 读取数据  主要方法重点介绍的地方
        createDataEvents(ctx, snapshotSplit.getTableId());
    // 记录高水位
        final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 3 - Determining high watermark {} for split {}",
                highWatermark,
                snapshotSplit);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setHighWatermark(highWatermark);

        return SnapshotResult.completed(ctx.offset);
    }
 
// 我们看看createDataEvents 调用过程

private void createDataEvents(
            RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
            TableId tableId)
            throws Exception {
        EventDispatcher.SnapshotReceiver snapshotReceiver =
                dispatcher.getSnapshotChangeEventReceiver();
        LOG.debug("Snapshotting table {}", tableId);
        createDataEventsForTable(
                snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
     // receiver的逻辑我们就不看了,我这里介绍一下就好
     // receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record
     // 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻
        snapshotReceiver.completeSnapshot();
    }

// createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑

    private void createDataEventsForTable(
            RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
            EventDispatcher.SnapshotReceiver snapshotReceiver,
            Table table)
            throws InterruptedException {

        long exportStart = clock.currentTimeInMillis();
        LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id());
    
       // 构建sql
        final String selectSql =
                StatementUtils.buildSplitScanQuery(
                        snapshotSplit.getTableId(),
                        snapshotSplit.getSplitKeyType(),
                        snapshotSplit.getSplitStart() == null,
                        snapshotSplit.getSplitEnd() == null);
        LOG.info(
                "For split '{}' of table {} using select statement: '{}'",
                snapshotSplit.splitId(),
                table.id(),
                selectSql);
    
        try (PreparedStatement selectStatement =
                        StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql
                                jdbcConnection,
                                selectSql,
                                snapshotSplit.getSplitStart() == null,
                                snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(),
                                snapshotSplit.getSplitEnd(),
                                snapshotSplit.getSplitKeyType().getFieldCount(),
                                connectorConfig.getQueryFetchSize());
             // 然后对查询出来的数据进行封装成sourceRecord发送下游
                ResultSet rs = selectStatement.executeQuery()) {

            ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
            long rows = 0;
            Threads.Timer logTimer = getTableScanLogTimer();

            while (rs.next()) {
                rows++;
                final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
                for (int i = 0; i < columnArray.getColumns().length; i++) {
                    Column actualColumn = table.columns().get(i);
                    row[columnArray.getColumns()[i].position() - 1] =
                            readField(rs, i + 1, actualColumn, table);
                }
                if (logTimer.expired()) {
                    long stop = clock.currentTimeInMillis();
                    LOG.info(
                            "Exported {} records for split '{}' after {}",
                            rows,
                            snapshotSplit.splitId(),
                            Strings.duration(stop - exportStart));
                    snapshotProgressListener.rowsScanned(table.id(), rows);
                    logTimer = getTableScanLogTimer();
                }
                // 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解
                dispatcher.dispatchSnapshotEvent(
                        table.id(),
                        getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个
                        snapshotReceiver);
            }
            LOG.info(
                    "Finished exporting {} records for split '{}', total duration '{}'",
                    rows,
                    snapshotSplit.splitId(),
                    Strings.duration(clock.currentTimeInMillis() - exportStart));
        } catch (SQLException e) {
            throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
        }
    }


// -------------------------   dispatcher.dispatchSnapshotEvent方法之后的流程  ----------------------------------

 // 进入evnentDisptcher.dispatchSnapshotEvent方法
   public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {
        DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
        if (dataCollectionSchema == null) {
            errorOnMissingSchema(dataCollectionId, changeRecordEmitter);
        }

        changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
            @Override
            public void changeRecord(DataCollectionSchema schema,
                                     Operation operation,
                                     Object key, Struct value,
                                     OffsetContext offset,
                                     ConnectHeaders headers)
                    throws InterruptedException {
                eventListener.onEvent(dataCollectionSchema.id(), offset, key, value);
               // 真正的放入队列的逻辑在这里调用
               // receiver使我们传入的  对应BufferingSnapshotChangeRecordReceiver类
                receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers);
            }
        });
    }

  // BufferingSnapshotChangeRecordReceiver的changeRecord方法
 // 前面简单介绍过他的处理逻辑了,就不必多做介绍了
  @Override
        public void changeRecord(DataCollectionSchema dataCollectionSchema,
                                 Operation operation,
                                 Object key, Struct value,
                                 OffsetContext offsetContext,
                                 ConnectHeaders headers)
                throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");

            LOGGER.trace("Received change record for {} operation on key {}", operation, key);

            if (bufferedEvent != null) {
                queue.enqueue(bufferedEvent.get());
            }

            Schema keySchema = dataCollectionSchema.keySchema();
            String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());

            // the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks
            bufferedEvent = () -> {
                SourceRecord record = new SourceRecord(
                        offsetContext.getPartition(),
                        offsetContext.getOffset(),
                        topicName, null,
                        keySchema, key,
                        dataCollectionSchema.getEnvelopeSchema().schema(), value,
                        null, headers);
                return changeEventCreator.createDataChangeEvent(record);
            };
        }
2.pollSplitRecords方法,这个方法是拉取queue中的数据

上面把数据读取后写入到queue的流程已经捋清楚了,我们现在看看reader是在什么时候读取了queue的数据

代码语言:javascript
复制
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
        checkReadException();

        if (hasNextElement.get()) {
            // data input: [low watermark event][snapshot events][high watermark event][binlog
            // events][binlog-end event]
            // data output: [low watermark event][normalized events][high watermark event]
            boolean reachBinlogEnd = false;
            final List<SourceRecord> sourceRecords = new ArrayList<>();
            while (!reachBinlogEnd) {
               // 可以看到这里直接queue.poll直接拉取数据即可,在这里会判断一下当前evnet是否是到达了结束的水位线,实际上就是高水位的位置,到达结束水位线之后,我们就可以停止了
                List<DataChangeEvent> batch = queue.poll();
                for (DataChangeEvent event : batch) {
                    sourceRecords.add(event.getRecord());
                    if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
                        reachBinlogEnd = true;
                        break;
                    }
                }
            }
            // snapshot split return its data once
            hasNextElement.set(false);
            return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)
                    .iterator();
        }
        // the data has been polled, no more data
        reachEnd.compareAndSet(false, true);
        return null;
    }

通过上面的阅读我们已经看到了数据从读取到,再到放入队列中的一个过程,这里对队列做一个说明,在上面的介绍中存在两个队列,一个queue,一个elementQueue,这个的区别在于,queue是在读取数据阶段,将数据放入queue,在FetcherTask中调用了reader的fetch方法,将queue中的数据拉取到,并将其加入到elementQueue

在上面的操作中数据已经放入了elementQueue中,现在我们看看elementQueue中的数据是在什么时候发送到下游的

这里我们需要重新回到MysqlSource中通过一张图来看看

我们在创建Reader的时候传入了一个MysqlRecordEmitter,在后面发送数据的时候是通过这个类

对于发送数据到下游的逻辑是在MysqlSourceReader的父类(SourceReaderBase)中,但是发送的类是有Emitter完成的

由于相关方法是有上层调用执行,我们就不多看了,就简单说明一下,系统调用SourceReaderBase.pollNext(),开始触发数据collect的操作,将其发送至下游节点

我们这里直接阅读MysqlRecordEmitter源码看看他发送数据的逻辑,其实到这里跟sourceFucntion实现的原理基本差不多了我们这里简单过一下即可

代码语言:javascript
复制
public final class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecord, T, MySqlSplitState> {

    private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
            new FlinkJsonTableChangeSerializer();

    private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    private final boolean includeSchemaChanges;
    private final OutputCollector<T> outputCollector;

    public MySqlRecordEmitter(
            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
            MySqlSourceReaderMetrics sourceReaderMetrics,
            boolean includeSchemaChanges) {
       // 对数据Deserialization的一个对象,与单并行度的类是同一个,具体内部逻辑可以自己看
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
        this.sourceReaderMetrics = sourceReaderMetrics;
        this.includeSchemaChanges = includeSchemaChanges;
        this.outputCollector = new OutputCollector<>();
    }

    @Override
    public void emitRecord(SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState)
            throws Exception {
      
        // 判断一下消息的事件类型,如果是一个sourceReocrd则发送下游,否则对应其事件的相关操作
        if (isWatermarkEvent(element)) {
            BinlogOffset watermark = getWatermark(element);
            if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
                splitState.asSnapshotSplitState().setHighWatermark(watermark);
            }
        } else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) {
            HistoryRecord historyRecord = getHistoryRecord(element);
            Array tableChanges =
                    historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
            TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
            for (TableChanges.TableChange tableChange : changes) {
                splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
            }
            if (includeSchemaChanges) {
                emitElement(element, output);
            }
        } else if (isDataChangeRecord(element)) {
            if (splitState.isBinlogSplitState()) {
                BinlogOffset position = getBinlogPosition(element);
                splitState.asBinlogSplitState().setStartingOffset(position);
            }
            reportMetrics(element);
            emitElement(element, output);
        } else {
            // unknown element
            LOG.info("Meet unknown element {}, just skip.", element);
        }
    }
 
    
    private void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
        outputCollector.output = output;
       // 调用不同的Deserialization.deserialize方法完成数据的转换以及发送到下游
        debeziumDeserializationSchema.deserialize(element, outputCollector);
    }
  
    private static class OutputCollector<T> implements Collector<T> {
        private SourceOutput<T> output;
        @Override
        public void collect(T record) {
            output.collect(record);
        }
    }
}

四.题外话 <Table相关内容>

代码语言:javascript
复制
// RowData的DeserializeSchema,对应上面使用到的DeserializeSchema 做一个简单的介绍
public final class RowDataDebeziumDeserializeSchema
        implements DebeziumDeserializationSchema<RowData> {
      @Override
    public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
        Envelope.Operation op = Envelope.operationFor(record);
        Struct value = (Struct) record.value();
        Schema valueSchema = record.valueSchema();
       // 针对不同的操作类型,我们需要器对应的数据  
       // after表示更改之后的数据结果  before表示更改之前的数据
       // 只有update的时候才需要同时使用 before和after(动动聪明脑袋瓜为啥呢)
       // 针对不同的操作使用RowKind进行表示,在sql层面会根据数据的标识来进行对应处理,比如insert,比如update操作
        // 所以对于table内容,我们只需要将数据转换成对应的RowData类型,并对其表示RowKind类型,框架便会帮我在sink的时候
       // 做出对应的操作,我们无需编写相关代码来实现
       // 所以 sql中使用 cdc 我们需要将其加上RowKind,对于后面的操作我们就无需关心了
      
       // 我们在sql中 formt格式是json,实际上走的逻辑这里我也没看,因为我不太关心,如果大家关心的话或者想了解的,大家可以自己去看看
       // 具体的实现方式
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
           // 构建RowData的方法,我们有加进来,里面内容比较繁琐
           // 讲讲大概内容
           // 就是对于我们字段的类型,转换成对应的java中类型,当然这里面我说的是flink中,因为他对用到的类型都重新做了一层封装,这样才能支持我们db中的所有类型
            GenericRowData insert = extractAfterRow(value, valueSchema);
            validator.validate(insert, RowKind.INSERT);
            insert.setRowKind(RowKind.INSERT);
           // 具体的发送下游方法
            emit(record, insert, out);
        } else if (op == Envelope.Operation.DELETE) {
            GenericRowData delete = extractBeforeRow(value, valueSchema);
            validator.validate(delete, RowKind.DELETE);
            delete.setRowKind(RowKind.DELETE);
            emit(record, delete, out);
        } else {
            GenericRowData before = extractBeforeRow(value, valueSchema);
            validator.validate(before, RowKind.UPDATE_BEFORE);
            before.setRowKind(RowKind.UPDATE_BEFORE);
            emit(record, before, out);

            GenericRowData after = extractAfterRow(value, valueSchema);
            validator.validate(after, RowKind.UPDATE_AFTER);
            after.setRowKind(RowKind.UPDATE_AFTER);
            emit(record, after, out);
        }
    }
  
}

对于我们想实现一个tableSource的话,我们需要继承DynamicTableSourceFactory,实现下面的方法,然后通过spi的方法将其动态的加载

  • createDynamicTableSource : 创建一个具体的source
  • factoryIdentifier : 可以认为是我们描述connector的名字,比如kafka
  • requiredOptions : with后面所必须要有参数,比如 username,password等,如果没有抛出异常
  • optionalOptions : with后面的配置项,这是可选择的可有可无
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-03-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 857Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 一点建议 :
    • 一.项目结构(mysql-cdc为主)
      • 1. 目录结构
        • 2. mysql项目源码包结构
        • 二.mysql-cdc源码-SourceFucntion的单并行度的实现
        • 三.mysql-cdc源码-新Source接口的实现
        • 四.题外话 <Table相关内容>
        相关产品与服务
        云数据库 SQL Server
        腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档