前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink CDC 2.2.1 + Flink 1.13 开发一个简单的CDC项目

Flink CDC 2.2.1 + Flink 1.13 开发一个简单的CDC项目

作者头像
大数据学习指南
发布2022-05-26 08:56:37
5.3K0
发布2022-05-26 08:56:37
举报
文章被收录于专栏:同名公众号:大数据学习指南

本文将演示如何使用 Flink DataStream API 开发一个 Flink CDC 应用。

本文的目标:

1.体验如何使用 Flink Stream API 开发一个 Flink CDC Demo,超级简单。

2.以Mysql为例,采集Mysql binlog数据。账号需要什么权限?需要注意什么?

3.生成 checkpoint 数据,重启程序从执行的状态恢复数据。

4.演示2.2版本动态加加载表的新特性,在2.1版本是一个BUG。

Flink CDC 使用 SQL 的方式,可以非常快速的开始一个 Flink CDC 的任务,就像下面这样:

下面开始,我使用Flink代码写一个简单的 Flink CDC 应用

第一步,创建一个 Flink 空项目

代码语言:javascript
复制
mvn archetype:generate                \
  -DarchetypeGroupId=org.apache.flink   \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=1.13.3

第二步,引入 Flink CDC 相关的依赖

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
<!--  flink-cdc-mysql  -->
<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>2.2.1</version>
</dependency>

第三步,编写 Flink 代码

代码如下:

代码语言:javascript
复制
package test;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class MysqlCDC {
    public static void main(String[] args) throws Exception {
        Properties debeziumProperties = new Properties();
        debeziumProperties.put("decimal.handling.mode", "String");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .scanNewlyAddedTableEnabled(true) // 开启支持新增表
                .databaseList("user") // set captured database
                .tableList("user.user_1,user.user_2,user.user_3") // set captured table
                .username("test_cdc")
                .password("tsl")
                .debeziumProperties(debeziumProperties)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        Configuration configuration = new Configuration();
        // 生产环境夏下,改成参数传进来
        configuration.setString("execution.savepoint.path","file:///tmp/flink-ck/1980d53f557a886f885172bcdf4be8e8/chk-21");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

        // enable checkpoint
        env.enableCheckpointing(3000);
        // 设置本地
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ck");
        env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print("==>").setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

MySqlSource 的账号需要有SELECT、REPLICATION SLAVE、REPLICATION CLIENT的权限

代码语言:javascript
复制
SELECT 权限代表允许从表中查看数据
REPLICATION SLAVE 权限代表允许执行show master status,show slave status,show binary logs命令
REPLICATION CLIENT 权限代表允许slave主机通过此用户连接master以便建立主从 复制关系

开启 Checkpoint 之后,重启的时候需要执行从哪一个状态恢复,这样可以采集任务从上一次的位置开始。

Flink 2.2 也支持了动态新增表,需要手动在程序里面开启这个功能[1]。见代码第 18 行。这样在我们停掉任务之后,程序里面新增了一张表,这样从上一次状态恢复的时候,其他已有的表可以接着上次的状态开始采集,新增的这一张表,从全量+增量开始。

[1]https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#scan-newly-added-tables

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-05-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据学习指南 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档