集成数据到 CKafka

最近更新时间:2025-07-29 10:34:42

我的收藏

实现原理

CKafka 连接器提供 MQTT Source Plugin, 通过 MQTT 共享订阅, 将消息发送到 CKafka 集群。共享订阅允许配置较大的并发度, 满足 Kafka 连接大数据生态需求。


数据映射

MQTT 消息在转换为 Kafka Record 时, 映射关系如下:


MQTT Message

一条 MQTT 消息由三部分组成: 系统字段、用户属性、Payload 参考: MQTT Control Packet format

系统字段

字段名称
语义
Packet ID
控制指令 ID, 不唯一, 快速复用详见 Spec 2.2.1
Duplicated
详见Spec 3.3.1.1
QoS
详见Spec 3.3.1.2
Retained
详见Spec 3.3.1.3
Message ID
扩展字段, 唯一消息编号
Publisher Client ID
扩展字段, 发布消息的客户端标识符
Publisher Client Host
扩展字段, 发布消息的客户端 IP

User Properties

用户指定的键值对列表, 参见 Spec 3.3.2.3.7

Kafka Record

字段
语义
Key
记录的键值, 可选
Headers
记录关联键值对, 常用来存储元数据, 比如 Content Type、事件时间等,可选
Payload
记录的真正负载数据, 消息体

Headers 使用场景

Message 路由
元数据存储描述
链路追踪 和 日志
定制化业务处理
安全认证
消息优先级
互操作性/兼容性指令
流处理