前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >教程|运输IoT中的NiFi

教程|运输IoT中的NiFi

作者头像
大数据杂货铺
发布2021-04-16 11:37:18
2.3K0
发布2021-04-16 11:37:18
举报
文章被收录于专栏:大数据杂货铺大数据杂货铺

介绍

本教程涵盖了Apache NiFi的核心概念及其在其中流量管理,易用性,安全性,可扩展架构和灵活扩展模型非常重要的环境中所扮演的角色。

我们将创建一个NiFi DataFlow,以将数据从边缘的物联网(IoT)设备传输到流应用程序。

运输IoT用例中的NiFi

什么是NiFi?

NiFi在此流处理应用程序中扮演什么角色?

  • NiFi充当生产者,从卡车和交通IoT设备获取数据,对数据进行简单的事件处理,以便可以将其拆分为TruckData和TrafficData,并可以将其作为消息发送到两个Kafka主题。

要了解什么是NiFi,请访问什么是Apache NiFi?从我们的“使用Apache NiFi分析运输模式”教程中获得。

架构概述

总体而言,我们的数据管道如下所示:

MiNiFi Simulator -----> NiFi ----> Kafka

有一个数据模拟器可复制MiNiFi在IoT边缘数据流中的位置,MiNiFi嵌入在车辆中,因此该模拟器可生成卡车和交通数据。NiFi会摄取此传感器数据。NiFi的流程会对数据进行预处理,以准备将其发送到Kafka。

NiFi的好处

流管理

  • 保证交付:持久的预写日志和内容存储库实现了很高的事务处理率,有效的负载分散,写时复制,并发挥了传统磁盘读/写的优势。
  • 具有背压和泄压功能的数据缓冲:如果将数据推送到队列中达到指定的限制,则NiFi将停止进程将数据发送到该队列中。数据达到一定期限后,NiFi会终止数据。
  • 优先级队列:一种设置,用于基于最大、最小、最旧或其他自定义优先级排序方案从队列中检索数据的方式。
  • 流特定QoS:针对特定数据的流特定配置,这些数据不容许丢失,并且其值根据时间敏感性而变小。

便于使用

  • 可视化命令和控制:实时可视化建立数据流,因此在数据流中进行的任何更改都将立即发生。这些更改仅隔离到受影响的组件,因此不需要停止整个流程或一组流程来进行修改。
  • 流程模板:一种构建和发布流程设计以使他人和协作受益的方法。
  • 数据来源:在数据流过系统时自动记录数据并建立索引。
  • 恢复/记录细粒度历史的滚动缓冲区:提供对内容的单击,内容的下载以及在对象生命周期中特定时间点的所有内容的重播。

安全

  • 系统到系统:通过使用加密协议提供安全的交换,并使流程能够加密和解密内容,并在发送方/接收方等式的任一侧使用共享密钥。
  • 用户到系统:启用2-Way SSL身份验证并提供可插入的授权,因此它可以适当地控制用户的访问权限和特定级别(只读,数据流管理器,admin)。
  • 多租户授权:允许每个团队管理流程,即使他们没有访问权限,也可以完全了解整个流程。

可扩展架构

  • 扩展:连接数据系统,无论数据系统A与系统B有多么不同,数据流过程都会在数据上执行并交互,以创建单线或双向通信线路。
  • 类加载器隔离:NiFi提供了一个自定义类加载器,以确保每个扩展包都尽可能独立,因此基于组件的依赖关系问题不会经常发生。因此,可以创建扩展束,而不必担心与另一个扩展发生冲突。
  • 站点到站点通信协议:轻松、高效、安全地将数据从一个NiFi实例传输到另一个实例。因此,嵌入NiFi的设备可以通过S2S相互通信,S2S支持基于套接字的协议和HTTP(S)协议。

弹性缩放模型

  • 横向扩展(集群):将多个节点集群在一起。因此,如果每个节点每秒能够处理数百MB,那么一个节点集群可以每秒处理GB。
  • 放大和缩小:增加处理器上的并发任务数量,以允许更多进程同时运行,或者减少此数量,使NiFi适合在硬件资源有限的边缘设备上运行。查看MiNiFi子项目,以了解有关解决此小规模数据挑战的更多信息。

演示运行NiFi

环境设定

我们将致力于运输物联网项目。如果您安装了最新的Cloudera DataFlow(CDF)沙盒,则该演示已预先安装。

部署NiFi DataFlow

让我们激活NiFi数据流,这样它将处理模拟数据并将数据推送到Kafka主题中。在http://sandbox-cdf.cloudera.com:9090/nifi/上打开NiFi 。如果不是,或者您尚未进行安装,请参阅现有CDF沙箱上的“安装演示”。

该货运物联网组件模板应该出现在NiFi默认画布,如下图所示。

要手动添加Trucking IoT模板,请执行以下操作:

1.将组件模板图标拖放到NiFi画布上。选择“运输物联网”,然后单击“添加”。通过单击画布上的任意位置来取消选择数据流。

2.在“操作面板”中,将手指向上,将其展开(如果已关闭),单击齿轮图标,然后单击“控制器服务”齿轮图标。在Controller Services中,检查状态是否为“ Enabled”,如下图所示。

如果不是“启用”,请执行以下步骤:

3.单击HortonworksSchemaRegistry右侧的Lighting Bolt。

4.对于Scope,选择Service and referenceing componen ...,然后按ENABLE,然后按CLOSE。

5.如步骤2所示,所有Controller Services均应为“ Enabled”。

Note: If any of your services are disabled, you can enable them by clicking on the lightning bolt symbol on the far right of the table. Controller Services are required to be enabled to successfully run the dataflow.

让我们选择整个数据流。保持命令或Ctrl和A,将选择整个数据流。在“操作面板”中,单击“开始”按钮,让其运行1分钟。数据流中每个组件的拐角处的红色停止符号将变为绿色播放符号。您应该看到连接队列中的数字从0变为更高的数字,表明正在处理数据。

您应该看到与以下图像相似的图像:

让我们分析处理器通过NiFi的数据来源采取的行动:

取消选择整个数据流,然后右键单击GetTruckingData:生成两种类型的数据:TruckDataTrafficData。单击查看数据源。

将出现一个带有出处事件的表。一个事件说明了处理器对数据采取了哪种类型的操作。对于GetTruckingData,它将创建两个类别的传感器数据作为一个流。选择与事件20个字节看到TrafficData或大于或等于98个字节看到TruckData。

要查看TruckData或TrafficData传感器数据,请选择要查看i的行左侧的。转到显示内容的标签,然后查看。

  • TruckData:由每辆卡车上的传感器模拟的数据。
  • TrafficData:根据特定货运路线上的交通拥堵情况模拟的数据。

您可以检查每个处理器的数据来源,以更深入地了解NiFi正在执行的处理和转换两种类型的模拟数据的步骤。这是显示步骤的流程图:

创建NiFi数据流

我们知道NiFi在此Trucking IoT应用程序中扮演的角色。让我们分析一下NiFi DataFlow,以了解其构建方式。让我们深入了解配置控制器服务和配置处理器的过程,以了解如何构建此NiFi DataFlow。

NiFi组件

查看NiFi的核心概念,以了解有关创建NiFi DataFlow的NiFi组件的更多信息。

开始构建NiFi DataFlow

在开始构建NiFi DataFlow之前,请确保我们从干净的画布开始。

  • 按CTRL-A或COMMAND-A选择整个画布
  • 在“操作面板”上,单击“删除”

Note: You may need to empty queues before deleting DataFlow. Do this by **right-clicking** non-empty queue, then select **Empty queue**.

设置架构注册表控制器服务

作为构建DataFlow的第一步,我们需要设置称为HortonworksSchemaRegistry的NiFi Controller Service 。转到“操作面板”,单击齿轮图标,然后选择“控制器服务”选项卡。要添加新的控制器服务,请按表格右上方的“ +”图标。但是,由于已经创建了该服务,因此我们将对其进行引用,以查看用户如何将NiFi与Schema Registry连接。

HortonworksSchemaRegistry

该控制器服务的“属性”选项卡

属性

Schema Registry URL

http://sandbox-hdf.hortonworks.com:7788/api/v1

Cache Size

1000

Cache Expiration

1 hour

模式用于将数据分为不同的类别:在使用ConvertRecord处理器期间,TruckData和TrafficData将应用于数据。

从上表中的配置中,我们可以看到允许NiFi与Schema Registry进行交互的URL,可以根据架构确定大小的缓存数量,以及直到架构缓存过期和NiFi必须与之通信所需的时间。架构注册表再次。

建立GetTruckingData

NiFi数据模拟器-生成两种类型的数据:TruckData和TrafficData作为CSV字符串。

将“设置”选项卡,“计划”选项卡,“属性”选项卡上的配置保留为默认值。

配置RouteOnAttribute

RouteOnAttribute -过滤TruckData和TrafficData类型分为从两个分开的流GetTruckingData

右键单击处理器,按配置选项以查看不同的配置选项卡及其参数。在每个选项卡中,您将看到以下配置:

设定标签

设置

Automatically Terminate Relationships

unmatched

其余应保留为默认值。

计划标签

保留默认配置。

属性标签

属性

Routing Strategy

Route to Property name

TrafficData

${dataType:equals('TrafficData')}

TruckData

${dataType:equals('TruckData')}

建立EnrichTruckData

EnrichTruckData-将天气数据(雾,风,雨)添加到从RouteOnAttribute的TruckData队列传入的每个流文件的内容中。

在即将推出的“自定义NiFi处理器-物联网运输”教程中了解有关构建GetTruckingData处理器的更多信息。

配置ConvertRecord:TruckData

ConvertRecord-使用Controller服务从EnrichTruckData处理器读取传入的CSV TruckData FlowFiles,并使用另一个Controller Service将CSV转换为Avro TruckData FlowFiles。

右键单击处理器,按配置选项以查看不同的配置选项卡及其参数。在每个选项卡中,您将看到以下配置:

设定标签

设置

Automatically Terminate Relationships

failure

计划标签

保留默认配置。

属性标签

属性

Record Reader

CSVReader - Enriched Truck Data

Record Writer

AvroRecordWriter - Enriched Truck Data

在操作面板中,您可以找到有关此处理器使用的控制器服务的更多信息:

CSVReader-丰富的卡车数据

该控制器服务的“属性”选项卡

属性

Schema Access Strategy

Use 'Schema Name' Property

Schema Registry

HortonworksSchemaRegistry

Schema Name

trucking_data_truck_enriched

Schema Text

$

Date Format

No value set

Time Format

No value set

Timestamp Format

No value set

CSV Format

Custom Format

Value Separator

\|

Treat First Line as Header

false

Quote Character

"

Escape Character

\

Comment Marker

No value set

Null String

No value set

Trim Fields

true

AvroRecordWriter-丰富的卡车数据

属性

Schema Write Strategy

HWX Content-Encoded Schema Reference

Schema Access Strategy

Use 'Schema Name' Property

Schema Registry

HortonworksSchemaRegistry

Schema Name

trucking_data_truck_enriched

Schema Text

$

配置ConvertRecord:TrafficData

ConvertRecord-使用Controller服务从RouteOnAttribute的TrafficData队列中读取传入的CSV TrafficData FlowFiles,并使用另一个Controller服务来编写Avro TrafficData FlowFiles。

右键单击处理器,按配置选项以查看不同的配置选项卡及其参数。在每个选项卡中,您将看到以下配置:

设定标签

设置

Automatically Terminate Relationships

failure

计划标签

保留默认配置。

属性标签

属性

Record Reader

CSVReader - Traffic Data

Record Writer

AvroRecordWriter - Traffic Data

在操作面板中,您可以找到有关此处理器使用的控制器服务的更多信息:

CSVReader-交通数据

该控制器服务的“属性”选项卡

属性

Schema Access Strategy

Use 'Schema Name' Property

Schema Registry

HortonworksSchemaRegistry

Schema Name

trucking_data_traffic

Schema Text

$

Date Format

No value set

Time Format

No value set

Timestamp Format

No value set

CSV Format

Custom Format

Value Separator

\|

Treat First Line as Header

false

Quote Character

"

Escape Character

\

Comment Marker

No value set

Null String

No value set

Trim Fields

true

AvroRecordWriter-交通数据

属性

Schema Write Strategy

HWX Content-Encoded Schema Reference

Schema Access Strategy

Use 'Schema Name' Property

Schema Registry

HortonworksSchemaRegistry

Schema Name

trucking_data_truck

Schema Text

$

配置PublishKafka_1_0:TruckData

PublishKafka_1_0 -接收来自flowfiles ConvertRecord - TruckData处理器和发送每个flowfile的内容作为一个消息发送到卡夫卡主题:trucking_data_truck使用卡夫卡生产者API。

右键单击处理器,按配置选项以查看不同的配置选项卡及其参数。在每个选项卡中,您将看到以下配置:

设定标签

设置

Automatically Terminate Relationships

failure, success

计划标签

保留默认配置。

属性标签

属性

Kafka Brokers

sandbox-hdf.hortonworks.com:6667

Security Protocol

PLAINTEXT

Topic Name

trucking_data_truck_enriched

Delivery Guarantee

Best Effort

Key Attribute Encoding

UTF-8 Encoded

Max Request Size

1 MB

Acknowledgment Wait Time

5 secs

Max Metadata Wait Time

30 sec

Partitioner class

DefaultPartitioner

Compression Type

none

配置PublishKafka_1_0:TrafficData

PublishKafka_1_0-从ConvertRecord-TrafficData处理器接收流文件,并使用Kafka Producer API将FlowFile内容作为消息发送给Kafka主题:trucking_data_traffic

右键单击处理器,按配置选项以查看不同的配置选项卡及其参数。在每个选项卡中,您将看到以下配置:

设定标签

设置

Automatically Terminate Relationships

failure, success

计划标签

保留默认配置。

属性标签

属性

Kafka Brokers

sandbox-hdf.hortonworks.com:6667

Security Protocol

PLAINTEXT

Topic Name

trucking_data_traffic

Delivery Guarantee

Best Effort

Key Attribute Encoding

UTF-8 Encoded

Max Request Size

1 MB

Acknowledgment Wait Time

5 secs

Max Metadata Wait Time

5 sec

Partitioner class

DefaultPartitioner

Compression Type

none

总结

恭喜你!现在,您将了解NiFi在Trucking-IoT演示应用程序的数据管道中扮演的角色,以及如何创建和运行数据流。

原文链接:https://www.cloudera.com/tutorials/nifi-in-trucking-iot/3.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据杂货铺 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 运输IoT用例中的NiFi
  • 演示运行NiFi
  • 创建NiFi数据流
  • 总结
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档