首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Cloudera流分析中引入FlinkSQL

Cloudera流分析中引入FlinkSQL

作者头像
大数据杂货铺
发布2020-07-14 10:07:25
5830
发布2020-07-14 10:07:25
举报
文章被收录于专栏:大数据杂货铺大数据杂货铺

由Apache Flink提供支持的Cloudera Streaming Analytics的1.2.0.0版本提供了广泛的新功能 ,包括通过Apache Atlas 支持血缘和元数据跟踪,支持连接到Apache Kudu 以及期待已久的FlinkSQL API 的第一次迭代。

Flink的SQL接口使流处理民主化,因为它可以迎合比当前广泛使用的Java和Scala API(面向数据工程人群)更大的社区。将SQL推广到流处理和流分析用例提出了一系列挑战:我们必须解决表达无限流和记录的及时性的问题。让我们考虑以下查询:

SELECT
  userId,
  COUNT(*) AS count,
  SESSION_START(clicktime, INTERVAL '30' MINUTE)
FROM clicks
GROUP BY
  SESSION(clicktime, INTERVAL '30' MINUTE)

  userId

该查询产生每个用户会话的点击计数,该计数由会话之间30分钟的不活动状态定义,并在遇到新会话时实时更新。这是在流处理中已经很好建立的概念的示例,在这种情况下,会话窗口被引入到SQL语法中以表示记录的及时性。重要的是要强调Flink支持的语法是ANSI SQL,它不是特定的方言。实际上,Flink社区正在与Apache Beam和Apache Calcite社区合作,以统一的方式 应对FlinkSQL的挑战。

转换到流处理组织

从上述查询来看,很明显,更大的用户群可以有效地制定查询,从而为企业增加价值。但是,它给组织带来了以下问题:

1) 在流媒体领域中可以用SQL制定多少业务逻辑?

2) 这如何改变从开发到生产的流式作业旅程?

3) 这如何影响数据工程团队的范围?

我们认为,今天编写的大多数流查询都可以通过FlinkSQL表示,以提供有根据的猜测,我们希望它能达到今天遇到的流查询的80%左右,这很适合通过此SQL实现API。首先,这似乎有些夸大其词,我们将在下一部分中详细介绍。

当前,我们经常遇到使用Flink的组织,其中近实时获得业务价值是数据工程师的特权。数据分析人员通常是特定领域知识的专家,他们倾向于使用标准MPP或OLAP系统中存储的这些流的快照,例如通过Apache Impala查询存储在Kudu中的数据。这从本质上引入了寻找以流的方式对其进行洞察和生产化。分析师在证实其假设之后,必须与几个数据工程师确保数周甚至数月的项目资金投入,以细致地重新实现已经用另一种语言(通常是SQL)制定的业务逻辑。FlinkSQL使分析人员可以直接与流进行交互,并单击按钮即可部署流作业。

反过来,这又解放了数据工程师,使他们可以专注于具有挑战性的20%的查询,并建立可重用的特定领域的库,这些库可以直接从SQL中作为一组用户自定义函数加以利用。

FlinkSQL的功能

为了展示FlinkSQL的功能,我们最近在我们的标准教程套件 下发布了SQL 教程 。让我们在这里重点介绍一些功能。

本教程针对Apache Kafka主题进行操作,其中包含JSON格式的事务条目。让我们为此定义一个表Schema,并指定我们要测量timestamp列记录的时间的流逝(称为event-time语义 )。

CREATE TABLE ItemTransactions (
transactionId    BIGINT,
`timestamp`    BIGINT,
itemId    STRING,
quantity INT,
event_time AS CAST(from_unixtime(floor(`timestamp`/1000)) AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector.type'      = 'kafka',
'connector.version'   = 'universal',
'connector.topic'     = 'transaction.log.1',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = '<broker_address>',
'format.type' = 'json'
);

请注意,在使用事件时间语义时,我们必须指定水印以为Flink提供启发式方法以测量事件时间的经过。这可以是返回时间戳的任意表达式。在较高级别上,水印指定了正确性(无限期等待潜在的延迟到达)和延迟(尽可能快地产生结果)之间的折衷。

创建上表后,我们可以提交以下查询:

SELECT * FROM ItemTransactions LIMIT 10;
SELECT TUMBLE_START(event_time, INTERVAL '10' SECOND) as window_start, itemId, sum(quantity) as volume
FROM ItemTransactions
GROUP BY itemId, TUMBLE(event_time, INTERVAL '10' SECOND);

第一个查询提供了直接的采样。使用limit子句是可选的,省略会导致结果以流方式不断更新。第二个查询实现一个简单的窗口聚合。这些查询的结果可以返回到交互式Flink SQL cli,或者可以通过INSERT INTO语句直接写入输出表。

FlinkSQL还提供了更复杂的子句,例如,可以按以下公式来查找在每10分钟的窗口中交易次数最多的前3个项目:

SELECT * FROM (
 SELECT * ,
 ROW_NUMBER() OVER (
   PARTITION BY window_start
   ORDER BY num_transactions desc
 ) AS rownum
 FROM (
   SELECT TUMBLE_START(event_time, INTERVAL '10' MINUTE) AS window_start, itemId, COUNT(*) AS num_transactions
   FROM ItemTransactions
   GROUP BY itemId, TUMBLE(event_time, INTERVAL '10' MINUTE)
 )
)
WHERE rownum <=3;

除了这些内置语言元素之外,您还可以将以Java和Scala实现的功能 注册到FlinkSQL环境。

FlinkSQL还支持访问外部目录以访问存储在外部系统中的Schema和数据,当前,我们支持Hive,Kudu和Schema Registry目录。

后续步骤

在当前版本中,提交SQL查询的两个选项是使用SQL CLI或将它们包装到Java程序中。正如我们在最近的主题演讲中 所讨论的,我们正在积极开发图形用户界面,以帮助进行交互式查询编辑。

在添加GUI之后,我们将在短期内公开其针对第三方工具的编程后端,以公开与JDBC for FlinkSQL等效的接口,该接口可能更多地基于REST和Kafka构建。

更多学习资源

如果您想了解有关FlinkSQL的更多信息以及Cloudera Streaming Analytics中的更多创新的知识,请参加我们颇受欢迎的Powerchat网络研讨会系列的最后一集-Flink Power Chat 4:Apache Flink开发的最佳实践清单 。

原文链接:https://blog.cloudera.com/introducing-flinksql-in-cloudera-streaming-analytics/

原作者:Marton Balassi

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据杂货铺 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档