前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink Mysql CDC 统计处理

Flink Mysql CDC 统计处理

原创
作者头像
平常心
修改2021-08-16 18:12:20
4.2K1
修改2021-08-16 18:12:20
举报
文章被收录于专栏:个人总结系列

1.环境准备

1.1 mysql 开启binlog

代码语言:javascript
复制
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30

1.2 flink的cdc依赖

代码语言:javascript
复制
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>

说明: 该依赖已经内置了debezium进行处理mysql 变更数据并发送了,所以我们不需要额外的方式,简化了异常 mysql → debezium → kafka的这种方式和数据流程。

2.代码开发

2.1 数据库和表准备

代码语言:javascript
复制
CREATE TABLE t_students (
    `id` BIGINT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
    `name` VARCHAR(24)  DEFAULT NULL,
    `age` INT(4) DEFAULT NULL,
    `create_time` DATETIME NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    `update_time` DATETIME NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
    PRIMARY KEY (`id`) USING BTREE
)
;

2.2 flink代码编写

2.2.1 stream api方式

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
 
final DebeziumDeserializationSchema deserializationSchema = new StringDebeziumDeserializationSchema();
 
final DebeziumSourceFunction sourceFunction = MySQLSource.builder()
         .hostname("127.0.0.1").port(3306)
         .databaseList("flink_cdc")
         .username("root")
         .password("123456")
         .deserializer(deserializationSchema)
         .build();
 
env.addSource(sourceFunction).setParallelism(1).print();

说明:这种不是很方便,数据解析也比较麻烦

2.2.2 table api 方式

代码语言:javascript
复制
final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        TableEnvironment tabEnv = TableEnvironment.create(settings);
        String ddlMysql = "CREATE TABLE mysql_binlog (id INT NOT NULL, " +
                "name STRING, " +
                "age INT, " +
                "create_time STRING, " +
                "update_time STRING " +
                ") " +
                "WITH ('connector' = 'mysql-cdc', " +
                "'hostname' = '127.0.0.1', " +
                "'port' = '3306', " +
                "'username' = 'root', " +
                "'password' = '123456', " +
                "'database-name' = 'flink_cdc', " +
                "'table-name' = 't_students')\n"
                ;
        tabEnv.executeSql(ddlMysql);
 
//        String sink = "CREATE TABLE sink_table (id INT NOT NULL, " +
//                "name STRING, " +
//                "age INT, " +
//                "create_time STRING, " +
//                "update_time STRING " +
//                ") " +
//                "WITH ('connector' = 'print')"
//                ;
//        tabEnv.executeSql(sink);
//
//        String dml = "INSERT INTO sink_table SELECT id, name, age, create_time, update_time FROM mysql_binlog";
//
//        final TableResult result = tabEnv.executeSql(dml);
 
        final TableResult result = tabEnv.executeSql("select * from mysql_binlog");
        result.print();

说明:cdc 最开始发起人是 吴邪,所以是通过table api的方式处理的,目前已经代码实现了很多对应的逻辑处理,方便使用和统计。

3.效果展示

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.环境准备
    • 1.1 mysql 开启binlog
      • 1.2 flink的cdc依赖
      • 2.代码开发
        • 2.1 数据库和表准备
          • 2.2 flink代码编写
          • 3.效果展示
          相关产品与服务
          大数据处理套件 TBDS
          腾讯大数据处理套件(Tencent Big Data Suite,TBDS)依托腾讯多年海量数据处理经验,基于云原生技术和泛 Hadoop 生态开源技术提供的可靠、安全、易用的大数据处理平台。 TBDS可在公有云、私有云、非云化环境,根据不同数据处理需求组合合适的存算分析组件,包括 Hive、Spark、HBase、Flink、Presto、Iceberg、Elasticsearch、StarRocks 等,以快速构建企业级数据湖仓。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档