前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Edge2AI之从边缘摄取数据

Edge2AI之从边缘摄取数据

作者头像
大数据杂货铺
发布2022-04-27 16:55:10
1.4K0
发布2022-04-27 16:55:10
举报
文章被收录于专栏:大数据杂货铺大数据杂货铺

在本次实操中,您将使用 MiNiFi 从边缘捕获数据并将其转发到 NiFi。

实验总结

  • 实验 1 - 在 Apache NiFi 上运行模拟器,将 IoT 传感器数据发送到 MQTT broker。
  • 实验 2 - 创建一个流以使用 Cloudera Edge Flow Manager 从 MQTT broker收集数据并将其发布到 MiNiFi 代理。
  • 实验 3 - 使用Cloudera Edge Flow Manager更新现有边缘流程并在边缘执行额外处理

实验 1 - Apache NiFi:设置机器传感器模拟器

在本实验中,您将运行一个简单的 Python 脚本来模拟来自一些假设的机器的 IoT 传感器数据,并将数据发送到 MQTT 代理 ( mosquitto )。MQTT 代理扮演网关的角色,通过“mqtt”协议连接到许多不同类型的传感器。您的集群附带模拟脚本发布到的嵌入式 MQTT 代理。为方便起见,我们将使用 NiFi 来运行脚本而不是 Shell 命令。

  1. 转到 Apache NiFi 并将处理器 (ExecuteProcess) 添加到画布。
  1. 右键单击处理器,选择配置(或者,只需双击处理器)。在属性选项卡上,设置如下所示的属性以运行我们的 Python 模拟脚本。
代码语言:javascript
复制
Command:           python3
Command Arguments: /opt/demo/simulate.py
  1. SCHEDULING选项卡中,设置为Run Schedule: 1 sec

或者,您可以将其设置为其他时间间隔:1 秒、30 秒、1 分钟等......

  1. Settings选项卡中
  2. AUTOMATICALLY TERMINATED RELATIONSHIP部分中的选中“success”关系
  3. 将处理器名称设置为“Generate Test Data
  4. 单击Apply
  1. 然后,您可以右键单击以start来启动此模拟器运行程序。
  1. 几秒钟后右键单击并选择Stop并查看数据出处。你会看到它已经运行了很多次并产生了结果。

实验 2 - 配置边缘流管理

Cloudera Edge Flow Management (EFM) 为您提供环境中所有 MiNiFi 代理的可视化总览,并允许您更新每个代理的流配置,并通过NiFi Registry集成进行版本控制。在本实验中,您将创建 MiNiFi 流并将其发布以供 MiNiFi 代理获取。

  1. 在http://<public_dns>:10088/efm/ui/打开 EFM Web UI并选择Monitor选项卡 (

)

  1. 单击EVENTS标题并验证您的 EFM 服务器是否正在接收来自 MiNiFi 代理的心跳。单击心跳记录上的信息图标以查看心跳的详细信息。
  1. 选择流设计器选项卡 (

)。要构建数据流,请从表中选择所需的类 ( iot-1),然后单击OPEN。或者,您可以双击所需的类。

  1. 通过将处理器图标拖到画布上,选择ConsumeMQTT处理器类型并单击“Add”按钮,将ConsumeMQTT处理器添加到画布。处理器位于画布上后,双击它并使用以下设置对其进行配置:
代码语言:javascript
复制
Broker URI:     tcp://<CLUSTER_HOSTNAME>:1883
Client ID:      minifi-iot
Topic Filter:   iot/#
Max Queue Size: 60

并确保在属性页面上向下滚动以设置Topic Filter和Max Queue Size:

  1. 远程处理组(Remote Process Group:RPG) 添加到画布并进行如下配置:
代码语言:javascript
复制
URL:http://<CLUSTER_HOSTNAME>:8080/nifi
Transport Protocol: HTTP
  1. 此时您需要将 ConsumerMQTT 处理器连接到 RPG。为此,您首先需要向远程 NiFi 服务器添加一个输入端口。
  2. 在以下位置打开 NiFi Web UI http://<public_dns>:8080/nifi/
  3. Input Port拖到画布上。
  4. 当提示输入其名称时,将其命名为“from Gateway”,然后单击ADD
  1. 要终止NiFI的Input Port的数据,现在让我们在画布上添加一个Funnel......
  1. …并建立从输入端口到它的连接。要建立连接,请将鼠标悬停在输入端口上,直到箭头符号显示在中心。单击箭头,将其拖放到漏斗上以连接两个元素。
  1. 右键单击输入端口并启动它。或者,单击输入端口将其选中,然后按操作面板上的开始(“play”)按钮:
  1. 您将需要Input Port的 ID来完成ConsumeMQTT处理器与 RPG (NiFi) 的连接。双击Input Port并复制其 ID。
  1. 回到 Flow Designer,将 ConsumeMQTT 处理器连接到 RPG。连接需要一个 ID,您可以在此处粘贴您从输入端口复制的 ID。确保没有空格
  1. 双击连接并更新以下配置:
代码语言:javascript
复制
Flowfile Expiration:60 seconds
Back Pressure Object Threshold: 10000
Connection Name:Sensor data
  1. 流程现已完成,但在发布之前,请在NiFi Registry中创建存储桶,以便存储流程的所有版本以供审核和审核。打开 NiFi Registry:http://<public_dns>:18080/nifi-registry,单击右上角的扳手/扳手图标 ( ) 并创建一个名为IoT(注意: 存储桶名称是大小写敏感的) 的存储桶。
  1. 您现在可以发布流以供 MiNiFi 代理自动获取。单击publish,为您的更改添加描述性注释,然后单击Apply。
  1. 返回NiFi Registry Web UI 并单击Cloudera 徽标旁边的NiFi Registry名称。如果流发布成功,您应该在 NiFi 注册表中看到流的版本详细信息。
  1. 此时,您可以测试边缘流直到 NiFi。再次启动 NiFi 模拟器(ExecuteProcess 处理器)并确认您可以看到 NiFi 中排队的消息。
  1. 右键单击队列并选择List Queue以查看收到消息的详细信息。
  1. 尝试单击其中一条消息的Info、Eye和Provenance图标,以分别查看消息属性、内容和出处详细信息。
  1. 例如,每条消息中的传感器读数都包含温度值,这些值都应该在 0 到 100 摄氏度之间。如果您对一些消息的内容进行采样,您应该能够注意到一些读数sensor_0并sensor_1报告了一些虚假值,如下所示。我们将在下一节中解决这个问题。
  1. 您现在可以停止该模拟器(停止 NiFi 处理器)。

实验 3 - 更新流程以在边缘执行额外处理

在之前的实验中,我们注意到一些传感器间歇性地发送错误的测量值。如果我们让这些测量由我们的下游应用程序处理,我们可能会遇到这些应用程序的输出质量问题。

我们可以过滤掉 NiFi 中的错误读数。但是,如果有问题的数据量很大,我们首先会浪费网络带宽将该数据发送到 NiFi。相反,我们要做的是将额外的逻辑推送到边缘,以识别和过滤这些问题,并避免将它们发送到 NiFi 的开销。

我们注意到,问题总是发生在测量中的温度sensor_0和sensor_1,只有。如果这两个温度中的任何一个大于 500,我们必须丢弃整个传感器读数。如果这两个温度都在正常范围内(< 500),我们可以保证报告的所有温度都是正确的,并且可以发送到 NiFi。

  1. 转至 CEM Web UI 并将新处理器添加到画布。在出现的对话框的过滤器框中,键入“JsonPath”。选择EvaluateJSONPath处理器并单击Add
  1. 双击新处理器并使用以下属性对其进行配置:
代码语言:javascript
复制
Processor Name: Extract sensor_0 and sensor1 values
Destination:    flowfile-attribute
  1. 单击Add Property按钮并输入以下属性:

Property Name

Property Value

sensor_0

$.sensor_0

sensor_1

$.sensor_1

  1. 单击Apply以保存处理器配置。
  2. 将一个新的处理器拖到画布上。在出现的对话框的过滤器框中,键入“Routeon”。选择RouteOnAttribute处理器并单击Add
  1. 双击新处理器并使用以下属性对其进行配置:
代码语言:javascript
复制
Processor Name: Filter Errors
Route Strategy: Route to Property name
  1. 单击Add Property按钮并输入以下属性:

Property Name

Property Value

error

${sensor_0:ge(500):or(${sensor_1:ge(500)})}

  1. 单击Apply以保存处理器配置。
  2. ConsumeMQTT处理器重新连接到Extract sensor_0 and sensor1 values处理器:
  3. 单击ConsumeMQTT和RPG之间的现有连接以将其选中。
  4. 将连接的目标端拖动到Extract sensor_0 and sensor1 values处理器。
  1. Extract sensor_0 and sensor1 values连接到Filter Errors处理器。当Create Connection对话框出现时,选择“ matched ”并点击Create
  1. 双击连接,更新以下配置并Apply更改:
代码语言:javascript
复制
Flowfile Expiration:60 seconds
Back Pressure Object Threshold: 10000
Connection Name:Extracted attributes
  1. 双击Extract sensor_0 and sensor1 values并复选AUTOMATICALLY TERMINATED RELATIONSHIPS部分中的以下值,然后单击Apply
代码语言:javascript
复制
failure
unmatched
  1. 在创建最后一个连接之前,您将需要(再次) NiFi Input Port的 ID 。转到 NiFi Web UI,双击“from Gateway”输入端口并复制其 ID。
  1. 返回 CEM Web UI,将Filter Errors处理器连接到 RPG:
  1. Create Connection对话框中,选中“ unmatched ”复选框并输入复制的输入端口 ID,然后单击Add:
  1. 双击连接,更新以下配置并Apply更改:
代码语言:javascript
复制
Flowfile Expiration:60 seconds
Back Pressure Object Threshold: 10000
Connection Name:Valid data
  1. 要忽略错误,请双击Filter Errors处理器,选中自动终止关系部分下的error复选框,然后单击应用
  1. 最后,在 CEM 画布上单击ACTIONS > Publish... ,输入描述性注释,例如“添加了错误读数的过滤”,然后单击Publish
  1. 再次启动模拟器。
  2. 转到 NiFi Web UI 并确认数据正在流向 NiFi。检查消息的内容,就像我们之前所做的那样,确认有问题的读数已经消失。

验证数据后停止模拟器。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-04-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据杂货铺 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实验总结
  • 实验 1 - Apache NiFi:设置机器传感器模拟器
  • 实验 2 - 配置边缘流管理
  • 实验 3 - 更新流程以在边缘执行额外处理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档