操作场景
数据订阅 Kafka 版(当前 Kafka Server 版本为V2.6.0)中,您可以通过0.11版本及以上的 Kafka 客户端 进行消费订阅数据, 本文为您提供了消费 Demo 示例,方便您快速测试消费数据的流程,了解数据格式解析的方法。
注意事项
Demo 并不包含消费数据的用法演示,仅对数据做了打印处理,您需要在此基础上自行编写数据处理逻辑,您也可以使用其他语言的 Kafka 客户端消费并解析数据。
目前不支持通过外网连接数据订阅的 Kafka 进行消费,只支持腾讯云内网的访问,并且订阅的数据库实例所属地域与数据消费的地域相同。
消费 Demo 下载
MongoDB 数据订阅当前仅支持 JSON 数据格式,如下 Demo 示例中已包含 JSON 协议文件,无需另外下载。
Java Demo 操作步骤
编译环境:Maven 包管理工具,JDK8。用户可自行选择打包工具,如下以 Maven 为例进行介绍。
运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址),安装 JRE8。
操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 新建 MongoDB 数据订阅。
2. 创建一个或多个消费组,详情请参见 新增消费组。
3. 下载 Java Demo ,然后解压该文件。
4. 进入解压后的目录,为方便使用,目录下分别放置了 Maven 模型文件、pom.xml 文件,用户根据需要选用。
使用 Maven 进行打包:mvn clean package。
5. 运行 Demo。
使用 Maven 打包后,进入目标文件夹 target ,运行如下代码:
java -jar consumerDemo-json-1.0-SNAPSHOT.jar --brokers xxx --topic xxx --group xxx --user xxx --password xxx --trans2sql --trans2canal
brokers
为数据订阅 Kafka 的内网访问地址,topic
为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看。group
、user
、password
分别为消费组的名称、账号和密码,可在 消费管理 页查看,trans2sql
表示是否转换为 SQL 语句,java 代码中,携带该参数表示转换为 SQL 语句,不携带则不转换。trans2canal
表示是否转换为 Canal 格式打印出来,携带该参数表示转换为 Canal 格式,不携带则不转换。(当前仅 JSON 数据格式涉及该参数)6. 观察消费情况。
![](https://qcloudimg.tencent-cloud.cn/image/document/650f91fcd10ccdccde59f9a245c64c4e.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/650f91fcd10ccdccde59f9a245c64c4e.png)
Golang Demo 操作步骤
编译环境:Golang 1.12 及以上版本,配置好 Go Module 环境。
运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址)。
操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 数据订阅 Kafka 版。
2. 创建一个或多个消费组,详情请参见 新增消费组。
3. 下载 Golang Demo,然后解压该文件。
4. 进入解压后的目录,运行
go build -o subscribe ./main/main.go
,生成可执行文件 subscribe。5. 运行如下代码:
./subscribe --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=true
brokers
为数据订阅 Kafka 的内网访问地址,topic
为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看。group
、user
、password
分别为消费组的名称、账号和密码,可在 消费管理 页查看。trans2sql
表示是否转换为 SQL 语句。6. 观察消费情况。
![](https://qcloudimg.tencent-cloud.cn/image/document/4d020cbfa91f8c9e85f11a887067905e.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/4d020cbfa91f8c9e85f11a887067905e.png)
Python3 Demo 操作步骤
编译运行环境:腾讯云服务器(需要与订阅实例相同地域,才能够访问到 Kafka 服务器的内网地址),安装 Python3,pip3(用于依赖包安装)。
使用 pip3 安装依赖包:
pip install flagpip install kafka-python
操作步骤如下:
1. 创建新版数据订阅任务,详情请参见 数据订阅 Kafka 版。
2. 创建一个或多个消费组,详情请参见 新增消费组。
3. 下载 Python3 Demo ,然后解压该文件。
4. 运行如下代码:
python main.py --brokers=xxx --topic=xxx --group=xxx --user=xxx --password=xxx --trans2sql=1
brokers
为数据订阅 Kafka 的内网访问地址,topic
为数据订阅任务的订阅 topic,这两个可在 订阅详情 页查看。group
、user
、password
分别为消费组的名称、账号和密码,可在 消费管理 页查看。trans2sql
表示是否转换为 SQL 语句。5. 观察消费情况。
![](https://qcloudimg.tencent-cloud.cn/image/document/dac810ca24246d7c212423c7828e9df9.png)
![](https://qcloudimg.tencent-cloud.cn/image/document/dac810ca24246d7c212423c7828e9df9.png)