专栏首页SmartSiKafka Connect JDBC Source MySQL 增量同步

Kafka Connect JDBC Source MySQL 增量同步

Kafka 版本:2.4.0

上一篇文章 Kafka Connect JDBC Source MySQL 全量同步 中,我们只是将整个表数据导入 Kafka。这对于获取数据快照很有用,但并不是所有场景都需要批量全部同步,有时候我们可能想要获取自上次之后发生的变更以实现增量同步。JDBC Connector 提供了这样的能力,将表中自上次轮询以来发生更改的行流式传输到 Kafka 中。可以基于递增的列(例如,递增的主键)或者时间戳列(例如,上次更新的时间戳)来进行操作。Kafka Connect JDBC Source 提供了三种增量同步模式:

  • incrementing
  • timestamp
  • timestamp+incrementing

下面我们详细介绍每一种模式。

1. incrementing 模式

CREATE TABLE IF NOT EXISTS `stu`(
   `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
   `stu_id` VARCHAR(100) NOT NULL,
   `stu_name` VARCHAR(100) NOT NULL,
   `gmt_create` DATETIME NULL DEFAULT CURRENT_TIMESTAMP,
   `gmt_modified` DATETIME NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
   PRIMARY KEY (`id` )
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

incrementing 模式基于表上严格递增的列来检测是否是新行。如果添加了具有新 ID 的新行,该行会被导入到 Kafka 中。需要使用 incrementing.column.name 参数指定严格递增列。如下所示使用 id 字段作为自增列:

curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" -d '{
    "name": "jdbc_source_connector_mysql_increment",
    "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:mysql://localhost:3306/kafka_connect_sample",
            "connection.user": "root",
            "connection.password": "root",
            "topic.prefix": "connect-mysql-increment-",
            "mode":"incrementing",
            "incrementing.column.name":"id",
            "catalog.pattern" : "kafka_connect_sample",
            "table.whitelist" : "stu"
        }
    }'

创建 Connector 成功之后如下显示:

在 incrementing 模式下,每次都是根据 incrementing.column.name 参数指定的列,查询大于自上次拉取的最大id:

SELECT * FROM stu
WHERE id > ?
ORDER BY id ASC

现在我们向 stu 数据表新添加 stu_id 分别为 00001 和 00002 的两条数据:

我们在使用如下命令消费 connect-mysql-increment-stu Topic 时,会连续得到两条记录,如下图所示:

bin/kafka-console-consumer.sh --topic connect-mysql-increment-stu --from-beginning --bootstrap-server localhost:9092

这种模式的缺点是无法捕获行上更新操作(例如,UPDATE、DELETE)的变更,因为无法增大该行的 id。如下所示我们删除了 stud_id 为 00002 的行、修改了 stud_id 为 00001 的行以及新添加了 stu_id 为 00003 的行:

但是只有新添加的 stu_id 为 00003 的行导入了 kafka:

2. timestamp 模式

CREATE TABLE IF NOT EXISTS `stu_timestamp`(
   `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
   `stu_id` VARCHAR(100) NOT NULL,
   `stu_name` VARCHAR(100) NOT NULL,
   `gmt_create` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
   `gmt_modified` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
   PRIMARY KEY (`id` )
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

timestamp 模式基于表上时间戳列来检测是否是新行或者修改的行。该列最好是随着每次写入而更新,并且值是单调递增的。需要使用 timestamp.column.name 参数指定时间戳列。如下所示使用 gmt_modified 字段作为时间戳列:

curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" -d '{
    "name": "jdbc_source_connector_mysql_timestamp",
    "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:mysql://localhost:3306/kafka_connect_sample",
            "connection.user": "root",
            "connection.password": "root",
            "topic.prefix": "connect-mysql-timestamp-",
            "mode":"timestamp",
            "timestamp.column.name":"gmt_modified",
            "catalog.pattern" : "kafka_connect_sample",
            "table.whitelist" : "stu_timestamp"
        }
    }'

需要注意的是时间戳列在数据表中不能设置为可 NULL,否则抛出如下异常:

org.apache.kafka.connect.errors.ConnectException: Cannot make incremental queries using timestamp columns
[gmt_modified] on `kafka_connect_sample`.`stu_timestamp` because all of these columns nullable.
	at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:497)
	at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:168)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

请注意,因为 Connector 要求时间戳列为 NOT NULL,我们可以将这些列设置为 NOT NULL,或者我们可以通过设置 validate.not.null 为 false 来禁用此验证。

创建 Connector 成功之后如下显示:

在 timestamp 模式下,每次都是根据 timestamp.column.name 参数指定的列,查询大于自上次拉取成功的 gmt_modified:

SELECT * FROM stu_timestamp
WHERE gmt_modified > ? AND gmt_modified < ?
ORDER BY gmt_modified ASC

现在我们向 stu_timestamp 数据表新添加 stu_id 分别为 00001 和 00002 的两条数据:

导入到 Kafka connect-mysql-increment-stu_timestamp Topic 中的记录如下图所示:

这种模式可以捕获行上 UPDATE 变更,同样也不能捕获 DELETE 变更:

只有更新的行导入了 kafka:

这种模式的缺点是可能造成数据的丢失。由于时间戳列不是唯一列字段,可能存在相同时间戳的两列或者多列,假设在导入第二条的过程中发生了崩溃,在恢复重新导入时,拥有相同时间戳的第二条以及后面几条数据都会丢失。这是因为第一条导入成功后,对应的时间戳会被记录已成功消费,恢复后会从大于该时间戳的记录开始同步。此外,也需要确保时间戳列是随着时间递增的,如果人为的修改时间戳列小于当前同步成功的最大时间戳,也会导致该变更不能同步。

3. timestamp+incrementing 混合模式

CREATE TABLE IF NOT EXISTS `stu_timestamp_inc`(
   `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
   `stu_id` VARCHAR(100) NOT NULL,
   `stu_name` VARCHAR(100) NOT NULL,
   `gmt_create` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
   `gmt_modified` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
   PRIMARY KEY (`id` )
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

如上所述,仅使用 incrementing 或 timestamp 模式都存在缺陷。将 timestamp 和 incrementing 一起使用,可以充分利用 incrementing 模式不丢失数据的优点以及 timestamp 模式捕获更新操作变更的优点。需要使用 incrementing.column.name 参数指定严格递增列、使用 timestamp.column.name 参数指定时间戳列。如下所示使用 id 字段作为自增列、gmt_modified 字段作为时间戳列的示例:

curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" -d '{
    "name": "jdbc_source_connector_mysql_timestamp_inc",
    "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:mysql://localhost:3306/kafka_connect_sample",
            "connection.user": "root",
            "connection.password": "root",
            "topic.prefix": "connect-mysql-timestamp-inc-",
            "mode":"timestamp+incrementing",
            "timestamp.column.name":"gmt_modified",
            "incrementing.column.name":"id",
            "catalog.pattern" : "kafka_connect_sample",
            "table.whitelist" : "stu_timestamp_inc"
        }
    }'

创建 Connector 成功之后如下显示:

在 timestamp+incrementing 模式下,需要根据自增列 id 和时间戳列 gmt_modified 一起来决定拉取哪些数据:

SELECT * FROM stu_timestamp_inc
WHERE gmt_modified < ?
  AND ((gmt_modified = ? AND id > ?) OR gmt_modified > ?)
ORDER BY gmt_modified, id ASC

现在我们向 stu_timestamp_inc 数据表新添加 stu_id 分别为 00001 和 00002 的两条数据:

导入到 Kafka connect-mysql-timestamp-inc-stu_timestamp_inc Topic 中的记录如下图所示:

这种模式可以捕获行上 UPDATE 变更,还是也不能捕获 DELETE 变更:

只有更新的行导入了 kafka:

4. 总结

incrementing 模式的缺点是无法捕获行上更新操作(例如,UPDATE、DELETE)、timestamp 模式存在丢数据的风险。timestamp+incrementing 混合模式充分利用了各自的优点,做到既能捕捉 UPDATE 操作变更,也能做到不丢数据。这三种模式对开发者比较友好,易配置和使用,但这三种模式还存在一些问题:

  • 无法获取 DELETE 操作变更,因为这三种模式都是使用 SELECT 查询来检索数据,并没有复杂的机制来检测已删除的行。
  • 由于最需要增量时间戳,处理历史遗留数据时需要额外添加时间戳列。如果无法更新 Schema,则不能使用本文中的模式。
  • 因为需要不断地运行查询,因此会对数据库产生一些负载。

参考:

相关推荐:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Kafka Connect JDBC Source MySQL 全量同步

    从数据库获取数据到 Apache Kafka 无疑是 Kafka Connect 最流行的用例。Kafka Connect 提供了将数据导入和导出 Kafka ...

    smartsi
  • Kafka核心API——Connect API

    Kafka Connect是一个用于将数据流输入和输出Kafka的框架。Confluent平台附带了几个内置connector,可以使用这些connector进...

    端碗吹水
  • 通过kafka/flink加载MySQL表数据消费 快速安装配置

    说明:对于数据迁移工具来说,好多封装了kafka和flink的,出于好奇,个人试着去下载了一下kafka和flink试着部署一下,本次就简单的记录一下安装过程,...

    用户7689089
  • 在confluent上测试connect source和sink

    为了实现分库分表前期的安全操作, 希望分表的数据还是能够暂时合并到原表中, 使用基于kafka connect实现, debezium做connect sour...

    XING辋
  • 使用kafka连接器迁移mysql数据到ElasticSearch

    把 mysql 的数据迁移到 es 有很多方式,比如直接用 es 官方推荐的 logstash 工具,或者监听 mysql 的 binlog 进行同步,可以结合...

    用户7634691
  • Mysql实时数据变更事件捕获kafka confluent之debezium

    如果你的后端应用数据存储使用的MySQL,项目中如果有这样的业务场景你会怎么做呢?

    XING辋
  • 【Kafka】核心API

    虚拟化软件推荐 VM https://www.cnblogs.com/PrayzzZ/p/11330937.html VirtualBOX

    瑞新
  • Flink + Debezium CDC 实现原理及代码实战

    Debezium 是一个分布式平台,它将现有的数据库转换为事件流,应用程序消费事件流,就可以知道数据库中的每一个行级更改,并立即做出响应。

    kk大数据
  • 07 Confluent_Kafka权威指南 第七章: 构建数据管道

    当人们讨论使用apache kafka构建数据管道时,他们通常会应用如下几个示例,第一个就是构建一个数据管道,Apache Kafka是其中的终点。丽日,从ka...

    冬天里的懒猫
  • MLSQL v1.1.7 Release roadmap

    MLSQL v1.1.7 plans to release in Mid Jan 2019, this version will take almost thr...

    用户2936994
  • 实时数仓|基于Flink1.11的SQL构建实时数仓探索实践

    实时数仓主要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析、实时的数据看板、业务指标实时监控等场景。虽然关于实时数仓的架构及技术选型...

    Spark学习技巧
  • kafka 连接器实现 Mysql 数据同步 Elasticsearch

    Mysql 作为传统的关系型数据库,主要面向 OLTP,性能优异,支持事务,但是在一些全文检索,复杂查询上面并不快。Elasticsearch 底层基于 Luc...

    Se7en258
  • 数据同步工具之FlinkCDC/Canal/Debezium对比

    数据准实时复制(CDC)是目前行内实时数据需求大量使用的技术,随着国产化的需求,我们也逐步考虑基于开源产品进行准实时数据同步工具的相关开发,逐步实现对商业产品的...

    大数据真好玩
  • 数据同步工具之FlinkCDC/Canal/Debezium对比

    数据准实时复制(CDC)是目前行内实时数据需求大量使用的技术,随着国产化的需求,我们也逐步考虑基于开源产品进行准实时数据同步工具的相关开发,逐步实现对商业产品的...

    王知无-import_bigdata
  • Kafka生态

    Confluent提供了业界唯一的企业级事件流平台,Confluent Platform通过将来自多个源和位置的数据集成到公司的单个中央事件流平台中,可以轻松构...

    35岁程序员那些事
  • 009.统一数据采集平台DBus-0.6.1安装部署

    我的环境已经安装了Ambari-2.7.4.0+HDP-3.1.4.0大数据平台,已安装的组件的版本如下:

    CoderJed
  • Debezium-Flink-Hudi:实时流式CDC

    Debezium是一个开源的分布式平台,用于捕捉变化数据(change data capture)的场景。它可以捕捉数据库中的事件变化(例如表的增、删、改等),...

    zhisheng
  • Flink on TiDB —— 便捷可靠的实时数据业务支撑

    本文由网易互娱计费数据中心实时业务负责人林佳老师分享,主要介绍网易数据中心在处理实时业务时为什么选择 Flink 和 TiDB,以及两者的结合应用情况。

    PingCAP
  • Apache Sqoop 将mysql导入到Hadoop HDFS

    第 21 章 Apache Sqoop 目录 21.1. 安装 Sqoop 21.2. sqoop2-tool 21.2.1. verify 21.2.2. u...

    netkiller old

扫码关注云+社区

领取腾讯云代金券