原文翻译自 DZone,根据原文意译。
作者使用了 Cloudera 私有云构建,架构图如下:
本文是关于如何在实时分析中使用云原生应用程序对股票数据进行连续 SQL 操作的教程。
我想使用 Apache NiFi 读取 REST API 来频繁地跟踪一些公司的股票。之后我得到一些数据流分析要使用 Apache Flink SQL 执行,最后使用 Apache Impala 查询 Apache Kudu 中的存储的数据。
我将在下面向您展示如何在几秒钟内在云原生应用程序中构建它。
源代码:https : //github.com/tspannhw/SmartStocks
用脚本加载schema、表、警报,请参阅 scripts/setup.sh:
源代码:https : //github.com/tspannhw/ApacheConAtHome2020
一旦我们的自动化管理员构建了我们的云环境并用我们的应用程序的优点填充它,我们就可以开始我们的持续执行的 SQL。如果你知道你的数据,建立一个 Schema,与注册中心共享.
我们添加的一项独特n内容是Avro Schema中的默认值,并将其设为时间戳毫秒的逻辑类型。这对 Flink SQL 时间戳相关查询很有帮助。
{ "name" : "dt", "type" : "long", "default": 1, "logicalType": "timestamp-millis"}
我们还需要一个关于股票警报的 Topic,稍后我们将使用 Flink SQL 创建该主题,因此让我们也为此定义一个模式。
对于今天的数据,我们将使用带有 AVRO Schema 的 AVRO 格式数据,以便在 Kafka Topic 中使用,无论谁将使用它。
现在我们正在将数据流式传输到 Kafka 主题,我们可以在 Flink SQL 连续 SQL 应用程序、NiFi 应用程序、Spark 3 应用程序等中使用它。所以在这种情况下,CFM NiFi 是我们的生产者,我们将拥有 CFM NiFi 和 CSA Flink SQL 作为 Kafka 消费者。
我们可以看到我们的数据在新的清理格式和我们需要的所有字段中的样子。
Cloudera Streams Messaging Manager 通过一个易于使用的预集成 UI 解决了所有这些难题。它预先连接到我的 Kafka Datahubs 并使用 SDX 进行保护。
我可以看到我的 AVRO 数据与相关的股票 schema 在 Topic 中,并且可以被消费。然后,我可以监控谁在消费、消费了多少,以及是否存在滞后或延迟。
消费AVRO 数据股票的schema,然后写入我们在Cloudera的数据平台由Apache Impala和Apache Kudu支持的实时数据集市。如果出现故障或无法连接,让我们重试 3 次。
我们使用 3+ 个 Kafka broker 。我们还可以有 Topic 名称和 consumer 名称的参数。我们从使用由 NiFi 自动准备好的 Kafka 标头中引用的股票 Schema 的股票表中读取。当我们向 Kafka 发送消息时,Nifi 通过NiFi 中的schema.name属性传递我们的 Schema 名称。正如我们所看到的,它是附加 Avro 的Schema,所以我们使用该 Reader 并使用该模式转换为简单的 JSON。
写入我们的云原生实时数据集市再简单不过了,我们引用了我们创建的股票表,并有权限使用 JSON Reader。我喜欢UPSERT,因为它能够处理 INSERT 和 UPDATE。
首先,我们需要在 Apache Hue 中从 CDP 或从脚本编写的命令行创建我们的 Kudu 表。
示例:impala-shell -i edge2ai-1.dim.local -d default -f /opt/demo/sql/kudu.sql
CREATE TABLEstocks( uuid STRING,
datetime
STRING,symbol
STRING,open
STRING,close
STRING,high
STRING,volume
STRING,ts
TIMESTAMP,dt
TIMESTAMP,low
STRING,PRIMARY KEY (uuid,datetime
) ) PARTITION BY HASH PARTITIONS 4 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
使用集成到 CDP 中的 Apache Hue,我可以检查我的实时数据集市表,然后查询表。
我的数据现在已准备好用于报告、仪表板、应用、笔记本、Web 应用程序、移动应用程序和机器学习。
我现在可以在几秒钟内在这张桌子上启动一个 Cloudera 可视化应用程序。
现在我们可以在 Flink 中构建我们的流分析应用程序。
我可以从命令行 Flink SQL Client 连接到 Flink SQL 开始探索我的 Kafka 和 Kudu 数据,创建临时表,并启动一些应用程序(插入语句)。该环境让我可以看到所有不同的可用目录,包括注册表(Cloudera Cloud Schema Registry)、hive(云原生数据库表)和 kudu(Cloudera 实时云数据集市)表。
1. 运行 Flink SQL 客户端
flink-yarn-session -tm 2048 -s 2 -d
flink-sql-client 嵌入式 -e sql-env.yaml
2. 运行 Flink SQL
3. 跨目录查询股票的 Kafka Topic
Select * from registry.default_database.stocks;
4. 对股票 Kudu/Impala 表的跨目录查询
Select * from kudu.default_database.impala::default.stocks;
5. 默认 Catalog
Use catalog default_catalog;
CREATE TABLE stockEvents ( symbol STRING, uuid STRING, ts BIGINT, dt BIGINT, datetime STRING, open STRING, close STRING, high STRING, volume STRING, low STRING,
event_time AS CAST(from_unixtime(floor(ts/1000)) AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND )
WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal',
'connector.topic' = 'stocks',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'edge2ai-1.dim.local:9092',
'format.type' = 'registry',
'format.registry.properties.schema.registry.url' = 'http://edge2ai-1.dim.local:7788/api/v1' );
show tables;;
Flink SQL> describe stockEvents;
root |-- symbol: STRING |-- uuid: STRING |-- ts: BIGINT |-- dt: BIGINT |-- datetime: STRING |--
open: STRING |-- close: STRING |-- high: STRING |-- volume: STRING |-- low: STRING |--
event_time: TIMESTAMP(3) AS CAST(FROM_UNIXTIME(FLOOR(ts / 1000)) AS TIMESTAMP(3)) |--
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
我们添加了从时间戳中提取的 watermark 和事件时间。
6. 简单的全选查询
Select * from default_catalog.default_database.stockEvents;
我们可以对我们创建的这个表进行一些有趣的查询。
7. 翻滚窗口
SELECT symbol
8. TUMBLE_START
(event_time, INTERVAL '1' MINUTE) as tumbleStart,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as tumbleEnd,
AVG(CAST(high as DOUBLE)) as avgHigh
FROM stockEvents
WHERE symbol is not null
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), symbol;
9. Top 3
SELECT *
FROM
( SELECT * , ROW_NUMBER() OVER
( PARTITION BY window_start ORDER BY num_stocks desc ) AS rownum
FROM (
SELECT TUMBLE_START(event_time, INTERVAL '10' MINUTE) AS window_start,
symbol,
COUNT(*) AS num_stocks
FROM stockEvents
GROUP BY symbol,
TUMBLE(event_time, INTERVAL '10' MINUTE) ) )
WHERE rownum <=3;
10. 股票警报
INSERT INTO stockalerts
/*+ OPTIONS('sink.partitioner'='round-robin') */
SELECT CAST(symbol as STRING) symbol,
CAST(uuid as STRING) uuid, ts, dt, open, close, high, volume, low,
datetime, 'new-high' message, 'nh' alertcode, CAST(CURRENT_TIMESTAMP AS BIGINT) alerttime FROM stocks st
WHERE symbol is not null
AND symbol <> 'null'
AND trim(symbol) <> ''
AND CAST(close as DOUBLE) > 11;
使用 CSA Flink Global Dashboard,我可以看到我所有的 Flink 作业正在运行,包括 SQL 客户端作业、断开连接的 Flink SQL 插入和部署的 Flink 应用程序。
我们还可以看到在股票警报 Topic 中热门的数据。我们可以针对这些数据运行 Flink SQL、Spark 3、NiFi 或其他应用程序来处理警报。那可能是下一个应用程序,我可能会将这些警报发送到 iPhone 消息、Slack 消息、数据库表和 WebSockets 应用程序。
我们都知道 NiFi 拥有深厚的数据血缘,可以通过 REST、报告任务或 CLI 推送或拉取,以用于审计、指标和跟踪。如果我想要整个流媒体管道的所有治理后的数据,我将使用 Apache Atlas,它在我的云数据平台中作为 SDX 一部分预先连接的数据。
本文系外文翻译,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系外文翻译,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。