前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何利用 Flink CDC 实现数据增量备份到 Clickhouse

如何利用 Flink CDC 实现数据增量备份到 Clickhouse

作者头像
麒思妙想
发布2021-07-19 10:26:32
3.9K0
发布2021-07-19 10:26:32
举报
文章被收录于专栏:麒思妙想麒思妙想
挖了很久的CDC坑,今天打算填一填了。本文我们首先来介绍什么是CDC,以及CDC工具选型,接下来我们来介绍如何通过Flink CDC抓取mysql中的数据,并把他汇入Clickhouse里,最后我们还将介绍Flink SQL CDC的方式。

CDC

首先什么是CDC ?它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。

其主要的应用场景:

  • 异构数据库之间的数据同步或备份 / 建立数据分析计算平台
  • 微服务之间共享数据状态
  • 更新缓存 / CQRS 的 Query 视图更新

CDC 它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。

基于查询的 CDC

基于日志的 CDC

概念

每次捕获变更发起 Select 查询进行全表扫描,过滤出查询之间变更的数据

读取数据存储系统的 log ,例如 MySQL 里面的 binlog持续监控

开源产品

Sqoop, Kafka JDBC Source

Canal, Maxwell, Debezium

执行模式

Batch

Streaming

捕获所有数据的变化

低延迟,不增加数据库负载

不侵入业务(LastUpdated字段)

捕获删除事件和旧记录的状态

捕获旧记录的状态

Debezium

Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong

Why debezium? 这里就放一张和网易大佬的聊天截图,说明吧

鸣谢,简佬,同意出镜

ClickHouse

实时数据分析数据库,俄罗斯的谷歌开发的,推荐OLAP场景使用

Clickhouse的优点.

  1. 真正的面向列的 DBMS ClickHouse 是一个 DBMS,而不是一个单一的数据库。它允许在运行时创建表和数据库、加载数据和运行 查询,而无需重新配置和重新启动服务器。
  2. 数据压缩 一些面向列的 DBMS(InfiniDB CE 和 MonetDB)不使用数据压缩。但是,数据压缩确实提高了性能。
  3. 磁盘存储的数据
  4. 在多个服务器上分布式处理
  5. SQL支持
  6. 数据不仅按列存储,而且由矢量 - 列的部分进行处理,这使开发者能够实现高 CPU 性能

Clickhouse的缺点

  1. 没有完整的事务支持,
  2. 缺少完整的Update/Delete操作,缺少高频率、低延迟的修改或删除已存在数据的能力,仅能用于批量删 除或修改数据
  3. 聚合结果必须小于一台机器的内存大小:
  4. 不适合key-value存储,

什么时候不可以用Clickhouse?

  1. 事物性工作(OLTP)
  2. 高并发的键值访问
  3. Blob或者文档存储
  4. 超标准化的数据

Flink CDC

Flink cdc connector 消费 Debezium 里的数据,经过处理再sink出来,这个流程还是相对比较简单的

首先创建 Source 和 Sink(对应的依赖引用,在文末)

代码语言:javascript
复制
       SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("test")
                .username("flinkcdc")
                .password("dafei1288")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        // 添加 source
        env.addSource(sourceFunction)
        // 添加 sink
        .addSink(new ClickhouseSink());

这里用到的JsonDebeziumDeserializationSchema,是我们自定义的一个序列化类,用于将Debezium输出的数据,序列化

代码语言:javascript
复制
// 将cdc数据反序列化
    public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

            Gson jsstr = new Gson();
            HashMap<String, Object> hs = new HashMap<>();

            String topic = sourceRecord.topic();
            String[] split = topic.split("[.]");
            String database = split[1];
            String table = split[2];
            hs.put("database",database);
            hs.put("table",table);
            //获取操作类型
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            //获取数据本身
            Struct struct = (Struct)sourceRecord.value();
            Struct after = struct.getStruct("after");

            if (after != null) {
                Schema schema = after.schema();
                HashMap<String, Object> afhs = new HashMap<>();
                for (Field field : schema.fields()) {
                    afhs.put(field.name(), after.get(field.name()));
                }
                hs.put("data",afhs);
            }

            String type = operation.toString().toLowerCase();
            if ("create".equals(type)) {
                type = "insert";
            }
            hs.put("type",type);

            collector.collect(jsstr.toJson(hs));
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }

这里是将数据序列化成如下Json格式

代码语言:javascript
复制
{"database":"test","data":{"name":"jacky","description":"fffff","id":8},"type":"insert","table":"test_cdc"}

接下来就是要创建Sink,将数据变化存入Clickhouse中,这里我们仅以insert为例

代码语言:javascript
复制
public static class ClickhouseSink extends RichSinkFunction<String>{
        Connection connection;
        PreparedStatement pstmt;
        private Connection getConnection() {
            Connection conn = null;
            try {
                Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
                String url = "jdbc:clickhouse://localhost:8123/default";
                conn = DriverManager.getConnection(url,"default","dafei1288");

            } catch (Exception e) {
                e.printStackTrace();
            }
            return conn;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            connection = getConnection();
            String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";
            pstmt = connection.prepareStatement(sql);
        }

        // 每条记录插入时调用一次
        public void invoke(String value, Context context) throws Exception {
            //{"database":"test","data":{"name":"jacky","description":"fffff","id":8},"type":"insert","table":"test_cdc"}
            Gson t = new Gson();
            HashMap<String,Object> hs = t.fromJson(value,HashMap.class);
            String database = (String)hs.get("database");
            String table = (String)hs.get("table");
            String type = (String)hs.get("type");

            if("test".equals(database) && "test_cdc".equals(table)){
                if("insert".equals(type)){
                    System.out.println("insert => "+value);
                    LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get("data");
                    String name = (String)data.get("name");
                    String description = (String)data.get("description");
                    Double id = (Double)data.get("id");
                    // 未前面的占位符赋值
                    pstmt.setInt(1, id.intValue());
                    pstmt.setString(2, name);
                    pstmt.setString(3, description);

                    pstmt.executeUpdate();
                }
            }
        }

        @Override
        public void close() throws Exception {
            super.close();

            if(pstmt != null) {
                pstmt.close();
            }

            if(connection != null) {
                connection.close();
            }
        }
    }

完整代码案例:

代码语言:javascript
复制
package name.lijiaqi.cdc;

import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.HashMap;

public class MySqlBinlogSourceExample {
    public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("test")
                .username("flinkcdc")
                .password("dafei1288")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加 source
        env.addSource(sourceFunction)
        // 添加 sink
        .addSink(new ClickhouseSink());

        env.execute("mysql2clickhouse");
    }

    // 将cdc数据反序列化
    public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

            Gson jsstr = new Gson();
            HashMap<String, Object> hs = new HashMap<>();

            String topic = sourceRecord.topic();
            String[] split = topic.split("[.]");
            String database = split[1];
            String table = split[2];
            hs.put("database",database);
            hs.put("table",table);
            //获取操作类型
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            //获取数据本身
            Struct struct = (Struct)sourceRecord.value();
            Struct after = struct.getStruct("after");

            if (after != null) {
                Schema schema = after.schema();
                HashMap<String, Object> afhs = new HashMap<>();
                for (Field field : schema.fields()) {
                    afhs.put(field.name(), after.get(field.name()));
                }
                hs.put("data",afhs);
            }

            String type = operation.toString().toLowerCase();
            if ("create".equals(type)) {
                type = "insert";
            }
            hs.put("type",type);

            collector.collect(jsstr.toJson(hs));
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }


    public static class ClickhouseSink extends RichSinkFunction<String>{
        Connection connection;
        PreparedStatement pstmt;
        private Connection getConnection() {
            Connection conn = null;
            try {
                Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
                String url = "jdbc:clickhouse://localhost:8123/default";
                conn = DriverManager.getConnection(url,"default","dafei1288");

            } catch (Exception e) {
                e.printStackTrace();
            }
            return conn;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            connection = getConnection();
            String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";
            pstmt = connection.prepareStatement(sql);
        }

        // 每条记录插入时调用一次
        public void invoke(String value, Context context) throws Exception {
            //{"database":"test","data":{"name":"jacky","description":"fffff","id":8},"type":"insert","table":"test_cdc"}
            Gson t = new Gson();
            HashMap<String,Object> hs = t.fromJson(value,HashMap.class);
            String database = (String)hs.get("database");
            String table = (String)hs.get("table");
            String type = (String)hs.get("type");

            if("test".equals(database) && "test_cdc".equals(table)){
                if("insert".equals(type)){
                    System.out.println("insert => "+value);
                    LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get("data");
                    String name = (String)data.get("name");
                    String description = (String)data.get("description");
                    Double id = (Double)data.get("id");
                    // 未前面的占位符赋值
                    pstmt.setInt(1, id.intValue());
                    pstmt.setString(2, name);
                    pstmt.setString(3, description);

                    pstmt.executeUpdate();
                }
            }
        }

        @Override
        public void close() throws Exception {
            super.close();

            if(pstmt != null) {
                pstmt.close();
            }

            if(connection != null) {
                connection.close();
            }
        }
    }
}

执行查看结果

数据成功汇入

Flink SQL CDC

接下来,我们看一下如何通过Flink SQL实现CDC ,只需3条SQL语句即可。

创建数据源表

代码语言:javascript
复制
        // 数据源表
        String sourceDDL =
                "CREATE TABLE mysql_binlog (\n" +
                        " id INT NOT NULL,\n" +
                        " name STRING,\n" +
                        " description STRING\n" +
                        ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'hostname' = 'localhost',\n" +
                        " 'port' = '3306',\n" +
                        " 'username' = 'flinkcdc',\n" +
                        " 'password' = 'dafei1288',\n" +
                        " 'database-name' = 'test',\n" +
                        " 'table-name' = 'test_cdc'\n" +
                        ")";

创建输出表

代码语言:javascript
复制
 // 输出目标表
        String sinkDDL =
                "CREATE TABLE test_cdc_sink (\n" +
                        " id INT NOT NULL,\n" +
                        " name STRING,\n" +
                        " description STRING,\n" +
                        " PRIMARY KEY (id) NOT ENFORCED \n " +
                        ") WITH (\n" +
                        " 'connector' = 'jdbc',\n" +
                        " 'driver' = 'com.mysql.jdbc.Driver',\n" +
                        " 'url' = '" + url + "',\n" +
                        " 'username' = '" + userName + "',\n" +
                        " 'password' = '" + password + "',\n" +
                        " 'table-name' = '" + mysqlSinkTable + "'\n" +
                        ")";

这里我们直接将数据汇入

代码语言:javascript
复制
// 简单的聚合处理
        String transformSQL =
                "insert into test_cdc_sink select * from mysql_binlog";

完整参考代码

代码语言:javascript
复制
package name.lijiaqi.cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class MysqlToMysqlMain {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);



        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);


        // 数据源表
        String sourceDDL =
                "CREATE TABLE mysql_binlog (\n" +
                        " id INT NOT NULL,\n" +
                        " name STRING,\n" +
                        " description STRING\n" +
                        ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'hostname' = 'localhost',\n" +
                        " 'port' = '3306',\n" +
                        " 'username' = 'flinkcdc',\n" +
                        " 'password' = 'dafei1288',\n" +
                        " 'database-name' = 'test',\n" +
                        " 'table-name' = 'test_cdc'\n" +
                        ")";


        String url = "jdbc:mysql://127.0.0.1:3306/test";
        String userName = "root";
        String password = "dafei1288";
        String mysqlSinkTable = "test_cdc_sink";
        // 输出目标表
        String sinkDDL =
                "CREATE TABLE test_cdc_sink (\n" +
                        " id INT NOT NULL,\n" +
                        " name STRING,\n" +
                        " description STRING,\n" +
                        " PRIMARY KEY (id) NOT ENFORCED \n " +
                        ") WITH (\n" +
                        " 'connector' = 'jdbc',\n" +
                        " 'driver' = 'com.mysql.jdbc.Driver',\n" +
                        " 'url' = '" + url + "',\n" +
                        " 'username' = '" + userName + "',\n" +
                        " 'password' = '" + password + "',\n" +
                        " 'table-name' = '" + mysqlSinkTable + "'\n" +
                        ")";
        // 简单的聚合处理
        String transformSQL =
                "insert into test_cdc_sink select * from mysql_binlog";

        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDDL);
        TableResult result = tableEnv.executeSql(transformSQL);

        // 等待flink-cdc完成快照
        result.print();
        env.execute("sync-flink-cdc");
    }

}

查看执行结果

添加依赖

代码语言:javascript
复制
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-jdbc_2.12</artifactId>-->
<!--            <version>1.10.3</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.13.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.13.0</version>
            <type>test-jar</type>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>


        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>flink-connector-clickhouse</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.6</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>
    </dependencies>

参考链接:

https://blog.csdn.net/zhangjun5965/article/details/107605396

https://cloud.tencent.com/developer/article/1745233?from=article.detail.1747773

https://segmentfault.com/a/1190000039662261

https://www.cnblogs.com/weijiqian/p/13994870.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 麒思妙想 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CDC
  • Debezium
  • ClickHouse
  • Flink CDC
  • Flink SQL CDC
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档