有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

操作场景

数据订阅 Kafka 版(当前 Kafka Server 版本为V2.6.0)中,您可以通过0.11版本及以上的 Kafka 客户端 进行消费订阅数据, 本文为您提供了消费 Demo 示例,方便您快速测试消费数据的流程,了解数据格式解析的方法。

注意事项

Demo 并不包含消费数据的用法演示,仅对数据做了打印处理,您需要在此基础上自行编写数据处理逻辑,您也可以使用其他语言的 Kafka 客户端消费并解析数据。
目前不支持通过外网连接数据订阅的 Kafka 进行消费,只支持腾讯云内网的访问,并且订阅的数据库实例所属地域与数据消费的地域相同。
在订阅指定库/表对象(非源实例全部),并且采用 Kafka 单分区的场景中,DTS 解析增量数据后,仅将订阅对象的数据写入 Kafka Topic 中,其他非订阅对象的数据会转成空事务写入 Kafka Topic,所以在消费数据时会出现空事务。空事务的 Begin/Commit 消息中保留了事务的 GTID 信息,可以保证 GTID 的连续性和完整性。
为了保证数据可重入,DTS 订阅引入 Checkpoint 机制。消息写入 Kafka Topic 时,一般每10秒会插入一个 Checkpoint,用来标识数据同步的位点,在任务中断后再重启识别断点位置,实现断点续传。另外,消费端遇到 Checkpoint 消息会做一次 Kafka 消费位点提交,以便及时更新消费位点。

消费 Demo 下载

TDSQL PostgreSQL 数据订阅当前仅支持 ProtoBuf 的数据格式,如下 Demo 中已包含 Protobuf 协议文件,无需另外下载。如果您选择自行下载 Protobuf 协议文件,请使用 Protobuf 3.X 版本进行代码生成,以便数据结构可以正确兼容。
TDSQL PostgreSQL 消费 Demo 的逻辑说明与 MySQL 的类似,请参考 MySQL Demo 说明
Demo 语言
ProtoBuf Demo 下载
Go
地址
Java
地址
Python
地址

Java Demo 操作步骤

编译环境:Maven 或者 Gradle 包管理工具,JDK8。用户可自行选择打包工具,如下以 Maven 为例进行介绍。 运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址),安装 JRE8。 操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 创建 TDSQL PostgreSQL 版数据订阅
2. 创建一个或多个消费组,详情请参见 新增消费组
3. 下载 Java Demo ,然后解压该文件。
4. 进入解压后的目录,为方便使用,目录下分别放置了 Maven 模型文件、pom.xml 文件,用户根据需要选用。 使用 Maven 进行打包:mvn clean package。
5. 运行 Demo。 使用 Maven 打包后,进入目标文件夹 target ,运行如下代码。java -jar consumerDemo-avro-1.0-SNAPSHOT.jar --brokers xxx --topic xxx --group xxx --user xxx --password xxx --trans2sql
brokers 为数据订阅 Kafka 的内网访问地址,topic 为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看。
groupuserpassword 分别为消费组的名称、账号和密码,可在 消费管理 页查看。
trans2sql 表示是否转换为 SQL 语句,java 代码中,携带该参数表示转换为 SQL 语句,不携带则不转换。
说明
携带 trans2sql 时,将使用 javax.xml.bind.DatatypeConverter.printHexBinary() 将 byte 值转成16进制,请使用 JDK1.8 版本及以上避免不兼容。如果不需要转 SQL,可以注释此处代码。
6. 观察消费情况。



Golang Demo 操作步骤

编译环境:Golang 1.12 及以上版本,配置好 Go Module 环境。 运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址)。 操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 数据订阅 Kafka 版
2. 创建一个或多个消费组,详情请参见 新增消费组
3. 下载 Golang Demo,然后解压该文件。
4. 进入解压后的目录,运行 go build -o subscribe ./main/main.go,生成可执行文件 subscribe。
5. 运行如下代码:./subscribe --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=true
brokers 为数据订阅 Kafka 的内网访问地址,topic 为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看。
groupuserpassword 分别为消费组的名称、账号和密码,可在 消费管理 页查看。
trans2sql 表示是否转换为 SQL 语句。
6. 观察消费情况。



Python3 Demo 操作步骤

编译运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址),安装 Python3,pip3(用于依赖包安装)。 使用 pip3 安装依赖包:
pip install flag
pip install kafka-python
pip install avro
操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 数据订阅 Kafka 版
2. 创建一个或多个消费组,详情请参见 新增消费组
3. 下载 Python3 Demo ,然后解压该文件。
4. 运行如下代码:python main.py --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=1
brokers 为数据订阅 Kafka 的内网访问地址,topic 为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看。
groupuserpassword 分别为消费组的名称、账号和密码,可在 消费管理 页查看。
trans2sql 表示是否转换为 SQL 语句。
5. 观察消费情况。