
基于 Flink CDC 的 SQL Api 实现实时监听 MySQL 的 binlog 数据发送到 Kafka
框架 | 版本 |
|---|---|
Flink | 1.13.2 |
MySQL | 5.7.25 |
connector-mysql-cdc | 2.0.0 |
public class CDCWithSqlTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(120000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("hdfs://namenode_ip:8020/data/checkpoint/flink_cdc/"));
System.setProperty("HADOOP_USER_NAME", "hdfs");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String sourceDDL = "create table cdc_from_mysql(" +
" id INT " +
" ,name STRING " +
" ,PRIMARY KEY(id) NOT enforced " +
" ) WITH ( " +
" 'connector' = 'mysql-cdc' ," +
" 'scan.startup.mode' = 'latest-offset' ," +
" 'server-time-zone' = 'Asia/Shanghai' ," +
" 'scan.incremental.snapshot.enabled' = 'true' ," +
" 'hostname' = 'mysql_ip' ," +
" 'port' = 'mysql_port' ," +
" 'username' = 'mysql_username' ," +
" 'password' = 'mysql_password' ," +
" 'database-name' = 'mysql_databse' ," +
" 'table-name' = 'mysql_table' ," +
" 'server-id' = '5400' " +
" ) ";
tableEnv.executeSql(sourceDDL);
String sinkDDL = "create table cdc_to_kafka(" +
" id INT " +
" ,name STRING " +
" ,PRIMARY KEY(id) NOT enforced " +
" ) WITH ( " +
" 'connector' = 'upsert-kafka' ," +
" 'topic' = 'ZGN_CDC_TEST' ," +
" 'properties.bootstrap.servers' = 'kafka_ip:9092' ," +
" 'key.json.ignore-parse-errors' = 'true' ," +
" 'key.format' = 'json' ," +
" 'value.format' = 'json' ," +
" 'value.fields-include' = 'ALL' " +
" ) ";
tableEnv.executeSql(sinkDDL);
tableEnv.executeSql("INSERT INTO cdc_to_kafka SELECT * FROM cdc_from_mysql");
}
}id | name |
|---|---|
1 | 1 |
2 | 2 |
3 | 3 |
kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic ZGN_CDC_TEST
{"id":1,"name":"1"}
{"id":2,"name":"2"}
{"id":3,"name":"3"}我这里直接通过Web UI Cancel掉任务
id | name |
|---|---|
1(上次添加) | 1(上次添加) |
2(上次添加) | 2(上次添加) |
3(上次添加) | 3(上次添加) |
4(此次添加) | 4(此次添加) |
5(此次添加) | 5(此次添加) |
6(此次添加) | 6(此次添加) |
kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic ZGN_CDC_TEST
{"id":1,"name":"1"}
{"id":2,"name":"2"}
{"id":3,"name":"3"}
----------------任务启停的分界线------------------
{"id":4,"name":"4"}
{"id":5,"name":"5"}
{"id":6,"name":"6"}
{"id":4,"name":"4"} -- 异常: 数据产生了重复消费
{"id":5,"name":"5"} -- 异常: 数据产生了重复消费
{"id":6,"name":"6"} -- 异常: 数据产生了重复消费
{"id":4,"name":"4"} -- 异常: 数据产生了重复消费
{"id":5,"name":"5"} -- 异常: 数据产生了重复消费
{"id":6,"name":"6"} -- 异常: 数据产生了重复消费
{"id":4,"name":"4"} -- 异常: 数据产生了重复消费
{"id":5,"name":"5"} -- 异常: 数据产生了重复消费
{"id":6,"name":"6"} -- 异常: 数据产生了重复消费
......java.lang.RuntimeException: One or more fetchers have encountered exception
......
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
......
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
......
Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1639381876000, eventType=EXT_WRITE_ROWS, serverId=999, headerLength=19, dataLength=23, nextPosition=356319039, flags=0}
......
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1639381876000, eventType=EXT_WRITE_ROWS, serverId=999, headerLength=19, dataLength=23, nextPosition=356319039, flags=0}
......
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException: No TableMapEventData has been found for table id:1800034. Usually that means that you have started reading binary log 'within the logical event group' (e.g. from WRITE_ROWS and not proceeding TABLE_MAP
......预取的数据是只消费一次 {"id":4,"name":"4"} {"id":5,"name":"5"} {"id":6,"name":"6"} 数据,但是事实却是一直在重复消费,怀疑重启后的 Flink CDC 程序不能很好的解析存储在 hdfs 中的检查点信息
主要报的错就是反序列化 MySQL 的 binlog 有问题,很难于上述的猜测达成一致
Improvements and Bug
1.[postgres] Fix Validator didn't implement Serializable
2.[mysql] Correct the initial binlog offset for MySqlHybridSplitAssigner
3.[mysql] Optimize the checkpoint be optional under single parallelism
4.[postgres] Fix postgres-cdc connector cannot recognize the optional option 'slot.name'
5.[mysql] Improve the code format in SignalEventDispatcher
6.[mysql] Add default value for 'database.history.instance.name' in MySqlParallelSource
7.[mysql] Add tests to check mysql-cdc works well under various timezones
8.[common] Remove useless parameter 'converter.schemas.enable'
9.[build] Run integration tests for each building
10.[changelog] fix changelog-deserialize exception message typo
11.[docs] Add FAQ for MySQL 8.0 Public Key Retrieval setting
12.[docs] Update the debezium document link to version 1.5
13.[docs] Add checkpoint and primary key setting for example in tutorials在 2.0.1 版本修复了日志变更反序列化的异常,刚好能对应的上报错日志的信息,因此,定位到此结束
将 Flink CDC 版本做一次升级,从 2.0.0 -> 2.0.2