首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >【BUG】Flink CDC 2.0.0迷之异常!!!

【BUG】Flink CDC 2.0.0迷之异常!!!

作者头像
857技术社区
发布2022-05-17 15:59:14
发布2022-05-17 15:59:14
2.4K0
举报
文章被收录于专栏:857-Bigdata857-Bigdata
一、场景还原

基于 Flink CDC 的 SQL Api 实现实时监听 MySQL 的 binlog 数据发送到 Kafka

二、框架版本

框架

版本

Flink

1.13.2

MySQL

5.7.25

connector-mysql-cdc

2.0.0

三、测试代码
代码语言:javascript
复制
public class CDCWithSqlTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(120000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig()
            .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStateBackend(new FsStateBackend("hdfs://namenode_ip:8020/data/checkpoint/flink_cdc/"));
        System.setProperty("HADOOP_USER_NAME", "hdfs");
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String sourceDDL = "create table cdc_from_mysql(" +
                "  id      INT                      " +
                " ,name    STRING                   " +
                " ,PRIMARY KEY(id)     NOT enforced " +
                " ) WITH ( " +
                " 'connector'                         = 'mysql-cdc'      ," +
                " 'scan.startup.mode'                 = 'latest-offset'  ," +
                " 'server-time-zone'                  = 'Asia/Shanghai'  ," +
                " 'scan.incremental.snapshot.enabled' = 'true'           ," +
                " 'hostname'                          = 'mysql_ip'       ," +
                " 'port'                              = 'mysql_port'     ," +
                " 'username'                          = 'mysql_username' ," +
                " 'password'                          = 'mysql_password' ," +
                " 'database-name'                     = 'mysql_databse'  ," +
                " 'table-name'                        = 'mysql_table'    ," +
                " 'server-id'                         = '5400'       " +
                " ) ";
        tableEnv.executeSql(sourceDDL);

        String sinkDDL = "create table cdc_to_kafka(" +
                "  id      INT                      " +
                " ,name    STRING                   " +
                " ,PRIMARY KEY(id)     NOT enforced " +
                " ) WITH ( " +
                " 'connector'                    = 'upsert-kafka'       ," +
                " 'topic'                        = 'ZGN_CDC_TEST'       ," +
                " 'properties.bootstrap.servers' = 'kafka_ip:9092' ," +
                " 'key.json.ignore-parse-errors' = 'true'               ," +
                " 'key.format'                   = 'json'               ," +
                " 'value.format'                 = 'json'               ," +
                " 'value.fields-include'         = 'ALL'                 " +
                " ) ";
        tableEnv.executeSql(sinkDDL);

        tableEnv.executeSql("INSERT INTO cdc_to_kafka SELECT * FROM cdc_from_mysql");
    }
}
四、BUG 重现
1.先向 MySQL 插入几条数据
1.1 MySQL 端

id

name

1

1

2

2

3

3

1.2.控制台消费 Kafka 数据
代码语言:javascript
复制
kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic ZGN_CDC_TEST

{"id":1,"name":"1"}
{"id":2,"name":"2"}
{"id":3,"name":"3"}
2.模拟 Flink 任务失败(停止 Flink 任务)

我这里直接通过Web UI Cancel掉任务

3.继续向 MySQL插入数据

id

name

1(上次添加)

1(上次添加)

2(上次添加)

2(上次添加)

3(上次添加)

3(上次添加)

4(此次添加)

4(此次添加)

5(此次添加)

5(此次添加)

6(此次添加)

6(此次添加)

4.从检查点重启 Flink 任务,控制台继续观测消费 Kafka 数据
代码语言:javascript
复制
kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic ZGN_CDC_TEST

{"id":1,"name":"1"}
{"id":2,"name":"2"}
{"id":3,"name":"3"}

----------------任务启停的分界线------------------

{"id":4,"name":"4"}
{"id":5,"name":"5"}
{"id":6,"name":"6"}
{"id":4,"name":"4"}    -- 异常: 数据产生了重复消费
{"id":5,"name":"5"}    -- 异常: 数据产生了重复消费
{"id":6,"name":"6"}    -- 异常: 数据产生了重复消费
{"id":4,"name":"4"}    -- 异常: 数据产生了重复消费
{"id":5,"name":"5"}    -- 异常: 数据产生了重复消费
{"id":6,"name":"6"}    -- 异常: 数据产生了重复消费
{"id":4,"name":"4"}    -- 异常: 数据产生了重复消费
{"id":5,"name":"5"}    -- 异常: 数据产生了重复消费
{"id":6,"name":"6"}    -- 异常: 数据产生了重复消费
......
五、错误日志
代码语言:javascript
复制
java.lang.RuntimeException: One or more fetchers have encountered exception
......
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
......
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
......
Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1639381876000, eventType=EXT_WRITE_ROWS, serverId=999, headerLength=19, dataLength=23, nextPosition=356319039, flags=0}
......
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1639381876000, eventType=EXT_WRITE_ROWS, serverId=999, headerLength=19, dataLength=23, nextPosition=356319039, flags=0}
......
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException: No TableMapEventData has been found for table id:1800034. Usually that means that you have started reading binary log 'within the logical event group' (e.g. from WRITE_ROWS and not proceeding TABLE_MAP
......
四、问题排查
1.从控制台消费 kafka 的数据来看

预取的数据是只消费一次 {"id":4,"name":"4"} {"id":5,"name":"5"} {"id":6,"name":"6"} 数据,但是事实却是一直在重复消费,怀疑重启后的 Flink CDC 程序不能很好的解析存储在 hdfs 中的检查点信息

2.从报错日志来看

主要报的错就是反序列化 MySQL 的 binlog 有问题,很难于上述的猜测达成一致

3.从 Flink CDC 社区查阅了 issue,没找到相类似错误
4.从 Flink CDC 的项目地址,发现在 2.0.1 版本修复了一个问题(第10条)
代码语言:javascript
复制
Improvements and Bug
1.[postgres] Fix Validator didn't implement Serializable
2.[mysql] Correct the initial binlog offset for MySqlHybridSplitAssigner
3.[mysql] Optimize the checkpoint be optional under single parallelism
4.[postgres] Fix postgres-cdc connector cannot recognize the optional option 'slot.name'
5.[mysql] Improve the code format in SignalEventDispatcher
6.[mysql] Add default value for 'database.history.instance.name' in MySqlParallelSource
7.[mysql] Add tests to check mysql-cdc works well under various timezones
8.[common] Remove useless parameter 'converter.schemas.enable'
9.[build] Run integration tests for each building
10.[changelog] fix changelog-deserialize exception message typo
11.[docs] Add FAQ for MySQL 8.0 Public Key Retrieval setting
12.[docs] Update the debezium document link to version 1.5
13.[docs] Add checkpoint and primary key setting for example in tutorials

在 2.0.1 版本修复了日志变更反序列化的异常,刚好能对应的上报错日志的信息,因此,定位到此结束

五、解决方案

将 Flink CDC 版本做一次升级,从 2.0.0 -> 2.0.2

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-12-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 857Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、场景还原
  • 二、框架版本
  • 三、测试代码
  • 四、BUG 重现
    • 1.先向 MySQL 插入几条数据
    • 1.1 MySQL 端
    • 1.2.控制台消费 Kafka 数据
  • 2.模拟 Flink 任务失败(停止 Flink 任务)
  • 3.继续向 MySQL插入数据
  • 4.从检查点重启 Flink 任务,控制台继续观测消费 Kafka 数据
  • 五、错误日志
  • 四、问题排查
    • 1.从控制台消费 kafka 的数据来看
    • 2.从报错日志来看
    • 3.从 Flink CDC 社区查阅了 issue,没找到相类似错误
    • 4.从 Flink CDC 的项目地址,发现在 2.0.1 版本修复了一个问题(第10条)
  • 五、解决方案
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档