在本次实验中,您将在 Cloudera SQL Stream Builder使用 SQL 语言查询和操作数据流。SQL Stream Builder 是一项功能强大的服务,使您无需编写 Java/Scala 代码即可创建 Flink 作业。
您将从包含温度传感器数据点流的先前实验中创建和填充的iot_enriched主题中获取数据。
本次实验以Edge和Nifi实验中开发的内容为基础。
让我们从一个简单的目标开始:使用 SQL 查询iot_enriched主题的内容,以检查正在流式传输的数据。尽管很简单,但此任务将展示 SQL Stream Builder (SSB) 的易用性和强大功能。
在开始从 Kafka 主题查询数据之前,您需要将 Kafka 集群注册为SSB 中的数据源。
Name: edge2ai-kafka
Brokers: <CLUSTER_HOSTNAME>:9092
Connection protocol: PLAINTEXT
现在您可以将iot_enriched Topic映射到 SQL Stream Builder 中的表。SSB 中的表是一种将 Kafka Topic与Schema相关联的方法,以便您可以在 SQL 查询中使用它。
Table name: iot_enriched
Kafka Cluster: edge2ai-kafka
Topic Name: iot_enriched
Data Format: JSON
从 Kafka 读取的序列化记录提供给record变量中的 Javascript 代码。转换代码的最后一个命令必须返回修改记录的序列化内容。
iot_enriched主题中的数据具有以微秒表示的时间戳。您需要将此字段转换为毫秒。让我们编写一个转换来为我们执行该转换。
单击Transformations选项卡并在代码框中输入以下代码:
// parse the JSON record
var parsedVal = JSON.parse(record.value);
// Convert sensor_ts from micro to milliseconds
parsedVal['sensor_ts'] = Math.round(parsedVal['sensor_ts']/1000);
// serialize output as JSON
JSON.stringify(parsedVal);
为此,请单击事件时间选项卡并配置以下属性:
Use Kafka Timestamps: Uncheck it
Input Timestamp Column: sensor_ts
Event Time Column: event_time
Watermark Seconds: 3
这会将event_time列添加到表中。此列具有TIMESTAMP ROWTIME数据类型,并且派生自sensor_ts列的值。
Consumer Group: ssb-iot-1
笔记 | 为虚拟表设置消费者组属性将确保如果您停止查询并稍后重新启动它,第二个查询执行将继续从第一个查询停止的点读取数据,而不会跳过数据。但是,如果多个查询使用同一个虚拟表,设置此属性将有效地将数据分布在查询中,以便每个记录仅由单个查询读取。如果要与多个不同查询共享虚拟表,请确保未设置 Consumer Group 属性。 |
---|
SELECT
event_time,
sensor_id,
sensor_ts,
is_healthy,
sensor_0,
sensor_1
FROM
iot_enriched
SQL Stream Builder 与Schema Registry的集成自动将存储在注册表中的Schema公开为 SSB 中的表。Schema Registry 中的 schema 名称必须与Kafka中相应的主题名称匹配。
在本实验中,您将在 SSB 中将 Schema Registry 注册为Catalog,以便您可以自动读取iot_enriched_avro以 AVRO 格式存储的主题内容。
https://raw.githubusercontent.com/cloudera-labs/edge2ai-workshop/master/sensor.avsc
Name: iot_enriched_avro
Description: Schema for the data in the iot_enriched_avro topic
Type: Avro schema provider
Schema Group: Kafka
Compatibility: Backward
Evolve: checked
Name: sr
Catalog Type: Schema Registry
Kafka Cluster: edge2ai-kafka
Schema Registry URL: http://<CLUSTER_HOSTNAME>:7788/api/v1
Enable TLS: No
Database Filter: .*
Table Filter: iot.*
单击>_Console > Compose > SQL并键入以下查询:
SELECT *
FROM `sr`.`default_database`.`iot_enriched_avro`
现在您已经运行了一些基本查询并确认您的表工作正常,您希望开始计算传入数据流的聚合并将结果提供给下游应用程序。
SQL Stream Builder 的表使我们能够将流数据发布/存储到几种不同的服务(Kafka、AWS S3、Google GCS、Kudu、HBase 等......)。
在本实验中,您将使用另一个 Kafka 表将聚合结果发布到另一个 Kafka 主题。
Topic name: sensor6_stats
Partitions: 10
Availability: Low
Cleanup Policy: delete
此查询将计算每秒向前滑动的 30 秒窗口内的聚合。对于记录 ( sensor_6) 中的特定传感器值,它为每个窗口计算以下聚合:
INSERT INTO sensor6stats
SELECT
sensor_id as device_id,
HOP_END(event_time, INTERVAL '1' SECOND, INTERVAL '30' SECOND) as windowEnd,
count(*) as sensorCount,
sum(sensor_6) as sensorSum,
avg(cast(sensor_6 as float)) as sensorAverage,
min(sensor_6) as sensorMin,
max(sensor_6) as sensorMax,
sum(case when sensor_6 > 70 then 1 else 0 end) as sensorGreaterThan60
FROM iot_enriched
GROUP BY
sensor_id,
HOP(event_time, INTERVAL '1' SECOND, INTERVAL '30' SECOND)
由于我们希望主题格式为 JSON,因此单击Templates > local-kafka > JSON。
这将CREATE TABLE在 SQL 脚本前添加一个 DDL,以创建与查询结构匹配的表!
编辑 DDL 语句并将属性topic的…值替换为实际的主题名称:sensor6_stats。将scan.statup.mode属性的值设置为latest-offset
请注意,屏幕上显示的数据只是查询返回的数据的样本,而不是完整的数据。
现在您只需要查询同一张表。
返回 SSB UI,单击New job以清除 SQL Compose 字段。
笔记 | 该Sensor6Stats作业将继续在后台运行。您可以通过SQL Jobs页面对其进行监控和管理。 |
---|
SELECT *
FROM sensor6stats
SQL Stream Builder 还可以获取数据流的键控快照,并通过 REST 接口以实体化视图的形式提供这些快照。在本实验中,您将创建和查询物化视图 (MV)。
您将在上一个实验中创建的查询之上定义 MV。在执行以下步骤之前确保查询正在运行。
编辑 SQL Compose 字段以删除整个CREATE TABLE语句。该字段中唯一剩下的应该是实际的INSERT … SELECT语句,如下所示:
Materialized View: Enabled
Primary Key: device_id
Retention: 300
Ignore NULLs: False
如果您已经在 SSB 中创建了 API Key,您可以从下拉列表中选择它。否则,通过单击上面显示的“添加 API Key”按钮在现场创建一个。用作ssb-lab键名。
URL Pattern: above60
Query Builder: <click "Select All" to add all columns>
Filters: sensorGreatThan60 greater 0
如果您刷新页面几次,您会注意到 MV 快照随着新数据点通过流而更新。
SSB 为定义的主键的每个值保留数据的最后状态。
您在上面创建的 MV 没有参数;当您调用 REST 端点时,它总是返回 MV 的完整内容。可以为 MV 指定参数,以便在查询时过滤内容。
在本节中,您将创建一个允许通过指定sensorAverage列的范围进行过滤的新 MV。
URL Pattern: above60withRange/{lowerTemp}/{upperTemp}
Query Builder: <click "Select All" to add all columns>
Filters: sensorGreatThan60 greater 0
AND
sensorAverage greater or equal {lowerTemp}
AND
sensorAverage less or equal {upperTemp}
您现在已经从一个主题中获取数据,计算了汇总结果并将其写入另一个主题。为了验证这是否成功,您使用独立的选择查询选择了结果。最后,您为其中一项作业创建了物化视图,并通过它们的 REST 端点查询了这些视图。