集成数据到 CKafka

最近更新时间:2025-10-20 16:56:02

我的收藏


实现原理

CKafka 连接器内置 MQTT Source Plugin,通过 MQTT 共享订阅机制,实时接入 MQTT 消息并转发至 CKafka 集群。该共享订阅模式支持高并发配置,可有效保障数据传输吞吐量,充分满足 Kafka 与大数据生态集成时对高流量接入和处理能力的需求。


数据映射

MQTT 消息在转换为 Kafka Record 时, 映射关系如下:


MQTT Message

一条 MQTT 消息由三部分组成: 系统字段、用户属性、Payload 参考: MQTT Control Packet format

系统字段

字段名称
语义
Packet ID
控制指令 ID, 不唯一, 快速复用详见 Spec 2.2.1
Duplicated
详见Spec 3.3.1.1
QoS
详见Spec 3.3.1.2
Retained
详见Spec 3.3.1.3
Message ID
扩展字段, 唯一消息编号
Publisher Client ID
扩展字段, 发布消息的客户端标识符
Publisher Client Host
扩展字段, 发布消息的客户端 IP

User Properties

用户指定的键值对列表, 参见 Spec 3.3.2.3.7

Kafka Record

字段
语义
Key
记录的键值, 可选
Headers
记录关联键值对, 常用来存储元数据, 比如 Content Type、事件时间等,可选
Payload
记录的真正负载数据, 消息体

Headers 使用场景

Message 路由
元数据存储描述
链路追踪和日志
定制化业务处理
安全认证
消息优先级
互操作性/兼容性指令
流处理

业务应用场景

智慧城市与交通数字孪生​​
实时采集城市多源交通数据(如车辆车牌、速度、行驶轨迹),通过 MQTT 主题上报,并借助 Kafka 连接器接入大数据生态。
支持基于车牌号等属性进行高效检索与分析(如车辆轨迹还原),为交通监控、调度和仿真提供数据支撑。

特性与优势

消息队列 CKafka 版是一个分布式、高吞吐量、高可扩展性的消息系统,然其本身并非专为边缘物联网通信场景设计,其客户端通常需要稳定的网络环境和较高的硬件资源,而物联网领域中海量的设备和应用产生的数据往往通过轻量级的 MQTT 协议进行传输。通过 ​​CKafka MQTT 连接器​​实现 MQTT 协议与 CKafka 生态的无缝集成,将设备端发布的 MQTT 消息实时流入 CKafka 主题,确保数据能够被实时处理、存储和进一步分析。该集成不仅保留了 MQTT 在弱网与低资源环境下的通信优势,还充分发挥 CKafka 在高吞吐、高可靠及生态兼容性方面的能力,真正实现了物联网数据与大数据系统间的灵活、稳定和高效整合。

操作步骤

策略与权限

1. 登录 消息队列 MQTT 版控制台,进入集群详情页面,确认当前 MQTT 集群是否开启授权策略管理。
1.1 若未开启权限策略,数据面资源暂无权限管理,可以使用任意“用户名+密码”进行连接、生产和消费等操作,详情见 授权策略。在此情况下,进行数据集成到CKafka的操作时无需其他额外配置,但是由于缺少权限管控,会存在一定的数据安全风险。
1.2 若已开启权限策略,请按照下文所述步骤进行操作。
2. 进入 认证管理 > 用户名和密码,单击新建用户 ,为数据集成任务创建专用账号和密码,用户名为ckafka_connector,并在说明中注明此用户为仅用于 MQTT 与 CKafka 数据集成任务,如图所示。

3. 进入 授权策略管理 页面,单击新建授权策略,强烈建议在本策略中明确授权上一步所创建的 CKafka 数据集成专用账号,以实现精确的权限控制。具体配置方式可参考下图,其余字段请结合实际需求填写,详情参考 授权策略


配置 CKafka 连接器

1. 登录 消息队列 CKafka 版控制台,进入连接列表页面,首先在页面最上方确认连接的所属地域。
2. 单击新建连接,进行连接器的创建

3. 按下图步骤进行连接信息的选择,连接类型选择 MQTT 集群,单击下一步进入连接配置页面。

4. 输入连接名称、描述等基本信息,并在下拉框中选择目标 MQTT 集群。此处的用户名和密码用作连接认证,是在 MQTT 集群中创建的数据集成专用账号密码,详见 策略和权限 小节。单击下一步进入连接校验过程。

5. 当校验均通过后,连接即创建成功。您可在 CKafka 控制台 > 连接器 > 连接列表中查看新增的连接。
对于已创建的连接,连接列表中将展示其基本信息,包括 ID、名称、状态、连接类型、绑定资源、资源所属地域、关联任务数、创建时间、描述等。
单击操作列中的编辑按钮,可修改连接配置。更新连接后,系统会默认开启“更新并重启所有关联任务”的开关,请在操作时根据实际业务需求谨慎选择;
单击操作列中的删除按钮将删除此连接。


创建数据集成任务

前提条件

在 MQTT 集群的同一地域下,已有创建好的 CKafka 实例,具体操作详情参阅消息队列 CKafka 快速入门

任务创建

1. 进入 CKafka 控制台 > 连接器 > 任务列表,单击左上角新建任务,进行任务相关信息填写,任务类型选择 数据接入 > MQTT集群,单击下一步进入数据源配置。

2. 下拉框中选择合适的连接,若没有合适选项,可单击下方跳转按钮,进入新建连接步骤;输入订阅的 Topic,若订阅多个主题请用“,”隔开。

3. 进行数据目标配置,确定分发策略以及目标CKafka实例。单击提交完成任务创建。

4. 当任务创建成功后,MQTT 集群下会自动新建一个共享订阅组,用于执行数据集成。

也可前往 客户端管理 页面查看执行该任务的连接器客户端详情。