在本次实验中,您将实施一个数据管道来处理之前从边缘捕获的数据。您将使用 NiFi 将这些数据摄取到 Kafka,然后使用来自 Kafka 的数据并将其写入 Kudu 表。
该实验以Edge Workshop中开发的内容为基础。
温度传感器产生的数据由文件中的模式描述sensor.avsc。在本实验中,我们将在 Schema Registry 中注册此Schema,以便我们在 NiFi 中的流可以使用统一服务引用Schema。这也将允许我们在未来Schema发送变化,如果需要的话,将旧版本保持在版本控制之下,以便现有的流和流文件将继续工作。
Name: SensorReading
Description: Schema for the data generated by the IoT sensors
Type: Avro schema provider
Schema Group: Kafka
Compatibility: Backward
Evolve: checked
在本实验中,您将创建一个 NiFi 流来接收来自网关所有的数据并将其推送到Kafka。
在开始构建流程之前,让我们创建一个处理组来帮助组织 NiFi 画布中的流程并启用流程版本控制。
Name: NiFi Registry
URL: http://<CLUSTER_HOSTNAME>:18080
返回NiFi Web UI,为处理组启用版本控制,右键单击它并选择Version > Start version control并输入下面的详细信息。完成后,处理组上将出现一个 ,表示现在已为其启用版本控制。
Registry: NiFi Registry
Bucket: SensorFlows
Flow Name: SensorProcessGroup
URL:http://<CLUSTER_HOSTNAME>:7788/api/v1
Schema Access Strategy: Use 'Schema Name' Property
Schema Registry: HortonworksSchemaRegistry
Schema Name: ${schema.name} -> already set by default!
Schema Write Strategy: HWX Schema Reference Attributes
Schema Access Strategy: Use 'Schema Name' Property
Schema Registry: HortonworksSchemaRegistry
在处理组内,添加一个新的Input Port并将其命名为“Sensor Data”。在Receive From字段中,选择Local connections。
通过将处理器图标拖到画布上来添加UpdateAttribute处理器:
Name:Set Schema Name
Property Name: schema.name
Property Value: SensorReading
Name: Publish to Kafka topic: iot
Kafka Brokers: <CLUSTER_HOSTNAME>:9092
Topic Name: iot
Record Reader: JsonTreeReader
Record Writer: JsonRecordSetWriter
Use Transactions: false
Attributes to Send as Headers (Regex): schema.*
笔记 | 确保您使用的是 PublishKafkaRecord_2.6 处理器而不是PublishKafka_2.6 处理器 |
---|
Property Name: client.id
Property Value: nifi-sensor-data
稍后,这将帮助我们清楚地识别谁在将数据生成到 Kafka 主题中。
此时,消息已经在 Kafka 主题中。您可以根据需要添加更多处理器来处理、拆分、复制或重新路由您的 FlowFile 到所有其他目的地和处理器。
现在我们的 NiFi 流程正在将数据推送到 Kafka,最好确认一切都按预期运行。在本实验中,您将使用 Streams Messaging Manager (SMM) 检查和监控 Kafka。
在本实验中,您将使用 NiFi 消费包含我们在上一个实验中摄取的 IoT 数据的 Kafka 消息,调用 CDSW 模型 API 端点来预测读取数据的机器是否可能发生故障。
为了准备实验,我们在集群上运行的 Cloudera Data Science Workbench (CDSW) 上训练并部署了一个机器学习模型。模型 API 可以获取传感器提供的 12 个温度读数的特征向量,并根据该向量预测机器是否可能发生故障。
在您将在本实验中构建的流程中,您将使用一些处理器/控制器服务将引用的变量:
这些变量指定访问在 CDSW 中运行的机器学习模型所必需的键。按照以下步骤从 CDSW 检索密钥并在 NiFi 中设置变量及其值。
Variable Name: cdsw.access.key
Variable Value: <access Key copied from CDSW>
Variable Name: cdsw.model.api.key
Variable Value: <key copied from CDSW>
当传感器数据使用PublishKafkaRecord处理器发送到 Kafka 时,我们选择在 Kafka 消息的标头中附加模式信息。现在,我们可以利用元数据为每条消息动态加载正确的模式,而不是硬编码我们应该使用哪个模式来读取消息。
但是,要做到这一点,我们需要配置一个不同的JsonTreeReader,它将使用标头中的模式属性,而不是${schema.name}像以前那样使用属性。
我们还将添加一个新的RestLookupService控制器服务来执行对 CDSW 模型 API 端点的调用。
在Settings选项卡上:
Name: JsonTreeReader - With schema identifier
在Properties选项卡上:
Schema Access Strategy: HWX Schema Reference Attributes
Schema Registry: HortonworksSchemaRegistry
在Properties选项卡上:
URL:http://modelservice.cdsw.<YOUR_CLUSTER_PUBLIC_IP>.nip.io/model
Record Reader: JsonTreeReader
Record Path: /response
笔记 | <YOUR_CLUSTER_PUBLIC_IP>以上必须替换为您的集群的公共 IP,而不是DNS 名称。最终 URL 应如下所示:http://modelservice.cdsw.12.34.56.78.nip.io/model |
---|
Authorization: Bearer ${cdsw.model.api.key}
我们现在将创建流程以从 Kafka 读取传感器数据,为每个传感器执行模型预测并将结果写入 Kudu。在本节结束时,您的流程应如下所示:
Settings选项卡:
Name: Consume Kafka iot messages
Properties选项卡:
Kafka Brokers: <CLUSTER_HOSTNAME>:9092
Topic Name(s): iot
Topic Name Format: names
Record Reader: JsonTreeReader - With schema identifier
Record Writer: JsonRecordSetWriter
Honor Transactions: false
Group ID: iot-sensor-consumer
Offset Reset: latest
Headers to Add as Attributes (Regex): schema.*
Settings选项卡:
Name: Predict machine health
Properties选项卡:
Record Reader: JsonTreeReader - With schema identifier
Record Writer: JsonRecordSetWriter
Lookup Service: RestLookupService
Result RecordPath: /response
Routing Strategy: Route to 'success'
Record Result Contents: Insert Entire Record
mime.type: toString('application/json', 'UTF-8')
request.body: concat('{"accessKey":"', '${cdsw.access.key}', '","request":{"feature":"', /sensor_0, ', ', /sensor_1, ', ', /sensor_2, ', ', /sensor_3, ', ', /sensor_4, ', ', /sensor_5, ', ', /sensor_6, ', ', /sensor_7, ', ', /sensor_8, ', ', /sensor_9, ', ', /sensor_10, ', ', /sensor_11, '"}}')
request.method: toString('post', 'UTF-8')
单击Apply以将更改保存到Predict machine health处理器。
Settings选项卡:
Name: Update health flag
Properties选项卡:
Record Reader: JsonTreeReader - With schema identifier
Record Writer: JsonRecordSetWriter
Replacement Value Strategy: Record Path Value
/is_healthy: /response/result
在下一部分中,您将在 NiFi 中配置PutKudu处理器以将数据写入 Kudu 表。在配置该处理器之前,让我们创建 Kudu 表。
笔记 | 如果您已经在之前的实验中创建了此表,则可以跳过以下创建步骤。 |
---|
登录到 Hue,然后在Impala 查询编辑器中,运行以下语句:
CREATE TABLE sensors
(
sensor_id INT,
sensor_ts BIGINT,
sensor_0 DOUBLE,
sensor_1 DOUBLE,
sensor_2 DOUBLE,
sensor_3 DOUBLE,
sensor_4 DOUBLE,
sensor_5 DOUBLE,
sensor_6 DOUBLE,
sensor_7 DOUBLE,
sensor_8 DOUBLE,
sensor_9 DOUBLE,
sensor_10 DOUBLE,
sensor_11 DOUBLE,
is_healthy INT,
PRIMARY KEY (sensor_id, sensor_ts)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
您可以在 Hue 的表格浏览器中找到 Kudu 表的名称。
单击左侧的表浏览器default图标并导航到数据库。单击sensors表并打开其详细信息选项卡。
记下 Kudu的表名。
设置选项卡:
Name: Write to Kudu
属性选项卡:
Kudu Masters: <CLUSTER_HOSTNAME>:7051
Table Name: <KUDU_TABLE_NAME (see previous section)>
Record Reader: JsonTreeReader - With schema identifier
我们现在已经准备好运行和测试我们的流程了。请按照以下步骤操作:
在本实验中,您将使用 Impala 引擎运行一些 SQL 查询,并验证 Kudu 表是否按预期更新。
SELECT count(*)
FROM sensors;
SELECT *
FROM sensors
ORDER by sensor_ts DESC
LIMIT 100;
select is_healthy,count(*) from sensors group by is_healthy