前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink CDC

Flink CDC

作者头像
挽风
发布2023-12-18 11:27:44
2530
发布2023-12-18 11:27:44
举报
文章被收录于专栏:小道小道

一、什么是CDC?

CDC是Change Data Capture(变更数据获取)的简称。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

二、CDC 种类

  CDC主要分为基于查询和基于Binlog两种方式,我们主要了解一下这两种之间的区别:

基于查询的CDC

基于Binlog的CDC

开源产品

Sqoop、Kafka JDBC Source

Canal、Maxwell、Debezium

执行模式

Batch

Streaming

是否可以捕获所有数据变化

延迟性

高延迟

低延迟

是否增加数据库压力

  Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。

三、Flink CDC案例

3.1 DataStream方式的应用

3.1.1 导入依赖
代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
</dependency>

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>1.12.0</version>
</dependency>

<dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.0.0</version>
</dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
3.1.2 编写代码
代码语言:javascript
复制
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK
        env.enableCheckpointing(5000L);
        //2.2 指定CK的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //2.3 设置任务关闭的时候保留最后一次CK数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
        //2.5 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
        //2.6 设置访问HDFS的用户名
        System.setProperty("HADOOP_USER_NAME", "xxx");

        //3.创建Flink-MySQL-CDC的Source
        //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
        //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
        //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
        //specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("database")
                .tableList("database.tablename") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
                .startupOptions(StartupOptions.initial())
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();

        //4.使用CDC Source从MySQL读取数据
        DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);

        //5.打印数据
        mysqlDS.print();

        //6.执行任务
        env.execute();

    }
}

3.2 FlinkSQL方式的应用

  代码实现

代码语言:javascript
复制
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSQL_CDC {

    public static void main(String[] args) throws Exception {

        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.创建Flink-MySQL-CDC的Source
        tableEnv.executeSql("CREATE TABLE user_info (" +
                "  id INT," +
                "  name STRING," +
                "  phone_num STRING" +
                ") WITH (" +
                "  'connector' = 'mysql-cdc'," +
                "  'hostname' = 'hadoop'," +
                "  'port' = '3306'," +
                "  'username' = 'root'," +
                "  'password' = '000000'," +
                "  'database-name' = 'database'," +
                "  'table-name' = 'table-name'" +
                ")");

        tableEnv.executeSql("select * from user_info").print();

        env.execute();
    }

}

3.3 自定义反序列化器

代码语言:javascript
复制
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.Properties;

public class Flink_CDCWithCustomerSchema {

    public static void main(String[] args) throws Exception {

        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.创建Flink-MySQL-CDC的Source
        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("hadoop")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("flink")
                .tableList("flink.user_info")//可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
				.startupOptions(StartupOptions.initial())
                .deserializer(new DebeziumDeserializationSchema<String>() {  //自定义数据解析器
                    @Override
                    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

                        //获取主题信息,包含着数据库和表名  mysql_binlog_source.flink.user_info
                        String topic = sourceRecord.topic();
                        String[] arr = topic.split("\\.");
                        String db = arr[1];
                        String tableName = arr[2];

                        //获取操作类型 READ DELETE UPDATE CREATE
                        Envelope.Operation operation = Envelope.operationFor(sourceRecord);

                        //获取值信息并转换为Struct类型
                        Struct value = (Struct) sourceRecord.value();

                        //获取变化后的数据
                        Struct after = value.getStruct("after");

                        //创建JSON对象用于存储数据信息
                        JSONObject data = new JSONObject();
                        for (Field field : after.schema().fields()) {
                            Object o = after.get(field);
                            data.put(field.name(), o);
                        }

                        //创建JSON对象用于封装最终返回值数据信息
                        JSONObject result = new JSONObject();
                        result.put("operation", operation.toString().toLowerCase());
                        result.put("data", data);
                        result.put("database", db);
                        result.put("table", tableName);

                        //发送数据至下游
                        collector.collect(result.toJSONString());
                    }

                    @Override
                    public TypeInformation<String> getProducedType() {
                        return TypeInformation.of(String.class);
                    }
                })
                .build();

        //3.使用CDC Source从MySQL读取数据
        DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);

        //4.打印数据
        mysqlDS.print();

        //5.执行任务
        env.execute();
    }
}

四、Flink-CDC 2.0

4.1 Flink-CDC 1.x痛点

在这里插入图片描述
在这里插入图片描述

4.2 Flink-CDC 2.0 设计

在这里插入图片描述
在这里插入图片描述

4.3 Flink-CDC 2.0 设计实现

  整体概览

在这里插入图片描述
在这里插入图片描述

  在对于有主键的表做初始化模式,整体的流程主要分为5个阶段:

  1.Chunk切分;

  根据Netflix DBlog的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置每个切片的区间为左闭右开或者左开右闭来保证数据的连续性

在这里插入图片描述
在这里插入图片描述

  2.Chunk分配;(实现并行读取数据&CheckPoint

  将划分好的Chunk分发给多个 SourceReader,每个SourceReader读取表中的一部分数据,实现了并行读取的目标。

  同时在每个Chunk读取的时候可以单独做CheckPoint,某个Chunk读取失败只需要单独执行该Chunk的任务,而不需要像1.x中失败了只能从头读取。

  若每个SourceReader保证了数据一致性,则全表就保证了数据一致性。

在这里插入图片描述
在这里插入图片描述

  3.Chunk读取;(实现无锁读取

  读取可以分为5个阶段

    1)SourceReader读取表数据之前先记录当前的Binlog位置信息记为低位点;

    2)SourceReader将自身区间内的数据查询出来并放置在buffer中;

    3)查询完成之后记录当前的Binlog位置信息记为高位点;

    4)在增量部分消费从低位点到高位点的Binlog;

    5)根据主键,对buffer中的数据进行修正并输出。

  通过以上5个阶段可以保证每个Chunk最终的输出就是在高位点时该Chunk中最新的数据,但是目前只是做到了保证单个Chunk中的数据一致性。

在这里插入图片描述
在这里插入图片描述

  4.Chunk汇报;

  在Snapshot Chunk读取完成之后,有一个汇报的流程,如上图所示,即SourceReader需要将Snapshot Chunk完成信息汇报给SourceEnumerator。

在这里插入图片描述
在这里插入图片描述

  5.Chunk分配。

  FlinkCDC是支持全量+增量数据同步的,在SourceEnumerator接收到所有的Snapshot Chunk完成信息之后,还有一个消费增量数据(Binlog)的任务,此时是通过下发Binlog Chunk给任意一个SourceReader进行单并发读取来实现的。

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-12-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、什么是CDC?
  • 二、CDC 种类
  • 三、Flink CDC案例
    • 3.1 DataStream方式的应用
      • 3.1.1 导入依赖
      • 3.1.2 编写代码
    • 3.2 FlinkSQL方式的应用
      • 3.3 自定义反序列化器
      • 四、Flink-CDC 2.0
        • 4.1 Flink-CDC 1.x痛点
          • 4.2 Flink-CDC 2.0 设计
            • 4.3 Flink-CDC 2.0 设计实现
            相关产品与服务
            数据库
            云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档