本教程涵盖了Apache NiFi的核心概念及其在其中流量管理,易用性,安全性,可扩展架构和灵活扩展模型非常重要的环境中所扮演的角色。
我们将创建一个NiFi DataFlow,以将数据从边缘的物联网(IoT)设备传输到流应用程序。
什么是NiFi?
NiFi在此流处理应用程序中扮演什么角色?
要了解什么是NiFi,请访问什么是Apache NiFi?从我们的“使用Apache NiFi分析运输模式”教程中获得。
架构概述
总体而言,我们的数据管道如下所示:
MiNiFi Simulator -----> NiFi ----> Kafka
有一个数据模拟器可复制MiNiFi在IoT边缘数据流中的位置,MiNiFi嵌入在车辆中,因此该模拟器可生成卡车和交通数据。NiFi会摄取此传感器数据。NiFi的流程会对数据进行预处理,以准备将其发送到Kafka。
NiFi的好处
流管理
便于使用
安全
可扩展架构
弹性缩放模型
环境设定
我们将致力于运输物联网项目。如果您安装了最新的Cloudera DataFlow(CDF)沙盒,则该演示已预先安装。
部署NiFi DataFlow
让我们激活NiFi数据流,这样它将处理模拟数据并将数据推送到Kafka主题中。在http://sandbox-cdf.cloudera.com:9090/nifi/上打开NiFi 。如果不是,或者您尚未进行安装,请参阅现有CDF沙箱上的“安装演示”。
该货运物联网组件模板应该出现在NiFi默认画布,如下图所示。
要手动添加Trucking IoT模板,请执行以下操作:
1.将组件模板图标拖放到NiFi画布上。选择“运输物联网”,然后单击“添加”。通过单击画布上的任意位置来取消选择数据流。
2.在“操作面板”中,将手指向上,将其展开(如果已关闭),单击齿轮图标,然后单击“控制器服务”齿轮图标。在Controller Services中,检查状态是否为“ Enabled”,如下图所示。
如果不是“启用”,请执行以下步骤:
3.单击HortonworksSchemaRegistry右侧的Lighting Bolt。
4.对于Scope,选择Service and referenceing componen ...,然后按ENABLE,然后按CLOSE。
5.如步骤2所示,所有Controller Services均应为“ Enabled”。
Note: If any of your services are disabled, you can enable them by clicking on the lightning bolt symbol on the far right of the table. Controller Services are required to be enabled to successfully run the dataflow.
让我们选择整个数据流。保持命令或Ctrl和A,将选择整个数据流。在“操作面板”中,单击“开始”按钮,让其运行1分钟。数据流中每个组件的拐角处的红色停止符号将变为绿色播放符号。您应该看到连接队列中的数字从0变为更高的数字,表明正在处理数据。
您应该看到与以下图像相似的图像:
让我们分析处理器通过NiFi的数据来源采取的行动:
取消选择整个数据流,然后右键单击GetTruckingData:生成两种类型的数据:TruckData和TrafficData。单击查看数据源。
将出现一个带有出处事件的表。一个事件说明了处理器对数据采取了哪种类型的操作。对于GetTruckingData,它将创建两个类别的传感器数据作为一个流。选择与事件20个字节看到TrafficData或大于或等于98个字节看到TruckData。
要查看TruckData或TrafficData传感器数据,请选择要查看i的行左侧的。转到显示内容的标签,然后查看。
您可以检查每个处理器的数据来源,以更深入地了解NiFi正在执行的处理和转换两种类型的模拟数据的步骤。这是显示步骤的流程图:
我们知道NiFi在此Trucking IoT应用程序中扮演的角色。让我们分析一下NiFi DataFlow,以了解其构建方式。让我们深入了解配置控制器服务和配置处理器的过程,以了解如何构建此NiFi DataFlow。
NiFi组件
查看NiFi的核心概念,以了解有关创建NiFi DataFlow的NiFi组件的更多信息。
开始构建NiFi DataFlow
在开始构建NiFi DataFlow之前,请确保我们从干净的画布开始。
Note: You may need to empty queues before deleting DataFlow. Do this by **right-clicking** non-empty queue, then select **Empty queue**.
设置架构注册表控制器服务
作为构建DataFlow的第一步,我们需要设置称为HortonworksSchemaRegistry的NiFi Controller Service 。转到“操作面板”,单击齿轮图标,然后选择“控制器服务”选项卡。要添加新的控制器服务,请按表格右上方的“ +”图标。但是,由于已经创建了该服务,因此我们将对其进行引用,以查看用户如何将NiFi与Schema Registry连接。
HortonworksSchemaRegistry
该控制器服务的“属性”选项卡
属性 | 值 |
---|---|
Schema Registry URL | http://sandbox-hdf.hortonworks.com:7788/api/v1 |
Cache Size | 1000 |
Cache Expiration | 1 hour |
模式用于将数据分为不同的类别:在使用ConvertRecord处理器期间,TruckData和TrafficData将应用于数据。
从上表中的配置中,我们可以看到允许NiFi与Schema Registry进行交互的URL,可以根据架构确定大小的缓存数量,以及直到架构缓存过期和NiFi必须与之通信所需的时间。架构注册表再次。
建立GetTruckingData
NiFi数据模拟器-生成两种类型的数据:TruckData和TrafficData作为CSV字符串。
将“设置”选项卡,“计划”选项卡,“属性”选项卡上的配置保留为默认值。
配置RouteOnAttribute
RouteOnAttribute -过滤TruckData和TrafficData类型分为从两个分开的流GetTruckingData。
右键单击处理器,按配置选项以查看不同的配置选项卡及其参数。在每个选项卡中,您将看到以下配置:
设定标签
设置 | 值 |
---|---|
Automatically Terminate Relationships | unmatched |
其余应保留为默认值。
计划标签
保留默认配置。
属性标签
属性 | 值 |
---|---|
Routing Strategy | Route to Property name |
TrafficData | ${dataType:equals('TrafficData')} |
TruckData | ${dataType:equals('TruckData')} |
建立EnrichTruckData
EnrichTruckData-将天气数据(雾,风,雨)添加到从RouteOnAttribute的TruckData队列传入的每个流文件的内容中。
在即将推出的“自定义NiFi处理器-物联网运输”教程中了解有关构建GetTruckingData处理器的更多信息。
配置ConvertRecord:TruckData
ConvertRecord-使用Controller服务从EnrichTruckData处理器读取传入的CSV TruckData FlowFiles,并使用另一个Controller Service将CSV转换为Avro TruckData FlowFiles。
右键单击处理器,按配置选项以查看不同的配置选项卡及其参数。在每个选项卡中,您将看到以下配置:
设定标签
设置 | 值 |
---|---|
Automatically Terminate Relationships | failure |
计划标签
保留默认配置。
属性标签
属性 | 值 |
---|---|
Record Reader | CSVReader - Enriched Truck Data |
Record Writer | AvroRecordWriter - Enriched Truck Data |
在操作面板中,您可以找到有关此处理器使用的控制器服务的更多信息:
CSVReader-丰富的卡车数据
该控制器服务的“属性”选项卡
属性 | 值 |
---|---|
Schema Access Strategy | Use 'Schema Name' Property |
Schema Registry | HortonworksSchemaRegistry |
Schema Name | trucking_data_truck_enriched |
Schema Text | $ |
Date Format | No value set |
Time Format | No value set |
Timestamp Format | No value set |
CSV Format | Custom Format |
Value Separator | \| |
Treat First Line as Header | false |
Quote Character | " |
Escape Character | \ |
Comment Marker | No value set |
Null String | No value set |
Trim Fields | true |
AvroRecordWriter-丰富的卡车数据
属性 | 值 |
---|---|
Schema Write Strategy | HWX Content-Encoded Schema Reference |
Schema Access Strategy | Use 'Schema Name' Property |
Schema Registry | HortonworksSchemaRegistry |
Schema Name | trucking_data_truck_enriched |
Schema Text | $ |
配置ConvertRecord:TrafficData
ConvertRecord-使用Controller服务从RouteOnAttribute的TrafficData队列中读取传入的CSV TrafficData FlowFiles,并使用另一个Controller服务来编写Avro TrafficData FlowFiles。
右键单击处理器,按配置选项以查看不同的配置选项卡及其参数。在每个选项卡中,您将看到以下配置:
设定标签
设置 | 值 |
---|---|
Automatically Terminate Relationships | failure |
计划标签
保留默认配置。
属性标签
属性 | 值 |
---|---|
Record Reader | CSVReader - Traffic Data |
Record Writer | AvroRecordWriter - Traffic Data |
在操作面板中,您可以找到有关此处理器使用的控制器服务的更多信息:
CSVReader-交通数据
该控制器服务的“属性”选项卡
属性 | 值 |
---|---|
Schema Access Strategy | Use 'Schema Name' Property |
Schema Registry | HortonworksSchemaRegistry |
Schema Name | trucking_data_traffic |
Schema Text | $ |
Date Format | No value set |
Time Format | No value set |
Timestamp Format | No value set |
CSV Format | Custom Format |
Value Separator | \| |
Treat First Line as Header | false |
Quote Character | " |
Escape Character | \ |
Comment Marker | No value set |
Null String | No value set |
Trim Fields | true |
AvroRecordWriter-交通数据
属性 | 值 |
---|---|
Schema Write Strategy | HWX Content-Encoded Schema Reference |
Schema Access Strategy | Use 'Schema Name' Property |
Schema Registry | HortonworksSchemaRegistry |
Schema Name | trucking_data_truck |
Schema Text | $ |
配置PublishKafka_1_0:TruckData
PublishKafka_1_0 -接收来自flowfiles ConvertRecord - TruckData处理器和发送每个flowfile的内容作为一个消息发送到卡夫卡主题:trucking_data_truck使用卡夫卡生产者API。
右键单击处理器,按配置选项以查看不同的配置选项卡及其参数。在每个选项卡中,您将看到以下配置:
设定标签
设置 | 值 |
---|---|
Automatically Terminate Relationships | failure, success |
计划标签
保留默认配置。
属性标签
属性 | 值 |
---|---|
Kafka Brokers | sandbox-hdf.hortonworks.com:6667 |
Security Protocol | PLAINTEXT |
Topic Name | trucking_data_truck_enriched |
Delivery Guarantee | Best Effort |
Key Attribute Encoding | UTF-8 Encoded |
Max Request Size | 1 MB |
Acknowledgment Wait Time | 5 secs |
Max Metadata Wait Time | 30 sec |
Partitioner class | DefaultPartitioner |
Compression Type | none |
配置PublishKafka_1_0:TrafficData
PublishKafka_1_0-从ConvertRecord-TrafficData处理器接收流文件,并使用Kafka Producer API将FlowFile内容作为消息发送给Kafka主题:trucking_data_traffic。
右键单击处理器,按配置选项以查看不同的配置选项卡及其参数。在每个选项卡中,您将看到以下配置:
设定标签
设置 | 值 |
---|---|
Automatically Terminate Relationships | failure, success |
计划标签
保留默认配置。
属性标签
属性 | 值 |
---|---|
Kafka Brokers | sandbox-hdf.hortonworks.com:6667 |
Security Protocol | PLAINTEXT |
Topic Name | trucking_data_traffic |
Delivery Guarantee | Best Effort |
Key Attribute Encoding | UTF-8 Encoded |
Max Request Size | 1 MB |
Acknowledgment Wait Time | 5 secs |
Max Metadata Wait Time | 5 sec |
Partitioner class | DefaultPartitioner |
Compression Type | none |
恭喜你!现在,您将了解NiFi在Trucking-IoT演示应用程序的数据管道中扮演的角色,以及如何创建和运行数据流。
原文链接:https://www.cloudera.com/tutorials/nifi-in-trucking-iot/3.html