数据转发到消息队列 CKAFKA

最近更新时间:2024-09-30 11:39:21

我的收藏

概述

规则引擎支持用户配置规则将符合条件的设备上报数据转发到 消息队列 CKAFKA (以下简称 CKAFKA ),用户的应用服务器再从 CKAFKA 中读取数据内容进行处理。以此利用 CKAFKA 高吞吐量的优势,为用户打造高可用性的消息链路。

配置

操作指南 创建规则和筛选数据,再添加行为操作,选择转发到 CKAFKA。
说明:
第一次使用时会提示用户授权访问 CKAFKA,您需单击授权访问 CKAFKA才能继续创建。


1. 在弹出的“添加规则”窗口,选择行为“数据转发到消息队列(CKAFKA)”;依次选择 CKAFKA 实例和 Topic,单击保存即可。
avatar


2. 完成以上配置后,物联网通信平台会将符合规则条件的设备上报数据转发至用户配置的 CKAFKA 。您可以参考 创建实例和 Topic 文档,在应用服务器上读取数据并进行处理。

数据格式

消息类型包括设备上报的 Topic 消息和平台监测到设备状态变化的通知消息,两类消息转发成功后,CKAFKA 收到的数据格式不同,如下:
设备上报的 Topic 消息:
转发成功后,CKAFKA 收到的数据格式如下:
{
"MsgType": "Forward",
"Event": "",
"Topic": "7PQ0I75ZWY/dev_01/event",
"Seq": 32569,
"PayloadLen": 44,
"ProductId": "7PQ0I75ZWY",
"DeviceName": "dev_01",
"Payload": "eyJkZXZpY2VfaW5mIjoiY2FyX2RldmljZSIsInRlbXBlcmF0dXJlIjoxOX0=",
"Time": "2022-08-11 19:17:24.943",
"TimeMills": 1660216644943,
"Reason": ""
}
各字段含义如下:
MsgType:取值为“Forward”。
Topic:设备上报此消息时的 Topic。
Seq:序列号。
PayloadLen:此设备上报消息的长度。
ProductId:产品 ID。
DeviceName:设备名称。
Payload:Base64 解码后为设备上报的消息内容。
Time:转发行为触发的时间,如 "2022-08-11 12:00:00"。
TimeMills:转发行为触发的时间戳,毫秒级别。
设备状态变化通知:
当平台监测到设备状态变化后,会触发此消息转发,转发成功后,收到的数据格式如下:
{
"MsgType": "Forward",
"Event": "",
"Topic": "$state/report/K72CRAIG98/pskDevice001",
"Seq": 0,
"PayloadLen": 178,
"ProductId": "K72CRAIG98",
"DeviceName": "pskDevice001",
"Payload": "eyJkZXZpY2VOYW1lIjoicHNrRGV2aWNlMDAxIiwiZXZlbnQiOiJFVl9PTkxJTkUiLCJwcm9kdWN0SUQiOiJLNzJDUkFJRzk4IiwicmVhc29uIjoiUkVBU09OX0RFVklDRV9DT05ORUNUIiwidGltZXN0YW1wIjoxNjc2OTY1MzUxLCJ0b3BpYyI6IiRzdGF0ZS9yZXBvcnQvSzcyQ1JBSUc5OC9wc2tEZXZpY2UwMDEifQ==",
"Time": "2023-02-21 15:42:31",
"TimeMills": 1676965351605,
"Reason": ""
}
Payload 进行 Base64 解码后:
{
"deviceName": "dev_01",
"event": "EV_OFFLINE",
"productID": "KXUCF9GJ9H",
"reason": "REASON_DEVICE_DISCONNECT",
"timestamp": 1677068839,
"topic": "$state/report/KXUCF9GJ9H/dev_01"
}
各字段含义如下:
event:"EV_ONLINE"上线,"EV_OFFLINE"下线。
reason 设备状态变化原因:
"REASON_DEVICE_DISCONNECT":设备断连。
"REASON_STATE_KICKED":服务端主动踢下线。
"REASON_DEVICE_KICKED":设备端互踢下线。
"REASON_KEEPALIVE_TIMEOUT":设备端超时断连。
productID:产品 ID。
deviceName:设备名称。
timestamp:时间戳。
topic 主题信息。

重发机制

重发机制用于在消息转发过程中发生失败的情况下,进行再次重发以达到接收消息的目的,具体说明如下:
若消息转发失败,系统则会进行转发重试,重试按照1s、3s、10s的时间间隔依次进行,若三次重试均失败,则将消息丢弃掉。
若用户配置了“转发错误行为操作”,在三次重试失败后,将按“转发错误行为操作”的配置,再进行一次消息转发,如果仍失败,则将消息丢弃掉。