看了那么多的技术文,你能明白作者想让你在读完文章后学到什么吗?
大数据羊说的文章会让你明白
废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:
local cache
,异步访问维表
,批量访问维表
三种方式去解决性能问题。批量访问维表
的能力,因此博主自己实现了一套,具体使用方式和原理实现敬请期待下篇文章。维表作为 sql 任务中一种常见表的类型,其本质就是关联表数据的额外数据属性,通常在 join 语句中进行使用。比如源数据有人的 id,你现在想要得到人的性别、年龄,那么可以通过用户 id 去关联人的性别、年龄,就可以得到更全的数据。
维表 join 在离线数仓中是最常见的一种数据处理方式了,在实时数仓的场景中,flink sql 目前也支持了维表的 join,即 lookup join,生产环境可以用 mysql,redis,hbase 来作为高速维表存储引擎。
Notes: 在实时数仓中,常用实时维表有两种更新频率
来看看在具体场景下,对应输入值的输出值应该长啥样。
需求指标:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。此处我们只关心关联维表这一部分的输入输出数据。
来一波输入数据:
曝光用户日志流(show_log)数据(数据存储在 kafka 中):
log_id | timestamp | user_id |
---|---|---|
1 | 2021-11-01 00:01:03 | a |
2 | 2021-11-01 00:03:00 | b |
3 | 2021-11-01 00:05:00 | c |
4 | 2021-11-01 00:06:00 | b |
5 | 2021-11-01 00:07:00 | c |
用户画像维表(user_profile)数据(数据存储在 redis 中):
user_id(主键) | age | sex |
---|---|---|
a | 12-18 | 男 |
b | 18-24 | 女 |
c | 18-24 | 男 |
注意:redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。如下图所示:
user_profile redis
预期输出数据如下:
log_id | timestamp | user_id | age | sex |
---|---|---|---|---|
1 | 2021-11-01 00:01:03 | a | 12-18 | 男 |
2 | 2021-11-01 00:03:00 | b | 18-24 | 女 |
3 | 2021-11-01 00:05:00 | c | 18-24 | 男 |
4 | 2021-11-01 00:06:00 | b | 18-24 | 女 |
5 | 2021-11-01 00:07:00 | c | 18-24 | 男 |
flink sql lookup join 登场。下面是官网的链接。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join
以上述案例来说,lookup join 其实简单理解来,就是每来一条数据去 redis 里面搂一次数据。然后把关联到的维度数据给拼接到当前数据中。
熟悉 DataStream api 的小伙伴萌,简单来理解,就是 lookup join 的算子就是 DataStream api 中的 flatmap 算子中处理每一条来的数据,针对每一条数据去访问用户画像的 redis。(实际上,flink sql api 中也确实是这样实现的!sql 生成的 lookup join 代码就是继承了 flatmap)
来看看上述案例的 flink sql lookup join sql 怎么写:
CREATE TABLE show_log (
log_id BIGINT,
`timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
user_id STRING,
proctime AS PROCTIME()
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE user_profile (
user_id STRING,
age STRING,
sex STRING
) WITH (
'connector' = 'redis',
'hostname' = '127.0.0.1',
'port' = '6379',
'format' = 'json',
'lookup.cache.max-rows' = '500',
'lookup.cache.ttl' = '3600',
'lookup.max-retries' = '1'
);
CREATE TABLE sink_table (
log_id BIGINT,
`timestamp` TIMESTAMP(3),
user_id STRING,
proctime TIMESTAMP(3),
age STRING,
sex STRING
) WITH (
'connector' = 'print'
);
-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT
s.log_id as log_id
, s.`timestamp` as `timestamp`
, s.user_id as user_id
, s.proctime as proctime
, u.sex as sex
, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id
这里使用了 for SYSTEM_TIME as of
时态表的语法来作为维表关联的标识语法。
Notes: 实时的 lookup 维表关联能使用处理时间去做关联。
运行结果如下:
log_id | timestamp | user_id | age | sex |
---|---|---|---|---|
1 | 2021-11-01 00:01:03 | a | 12-18 | 男 |
2 | 2021-11-01 00:03:00 | b | 18-24 | 女 |
3 | 2021-11-01 00:05:00 | c | 18-24 | 男 |
4 | 2021-11-01 00:06:00 | b | 18-24 | 女 |
5 | 2021-11-01 00:07:00 | c | 18-24 | 男 |
flink web ui 算子图如下:
flink web ui
但是!!!但是!!!但是!!!
flink 官方并没有提供 redis 的维表 connector 实现。
没错,博主自己实现了一套。关于 redis 维表的 connector 实现,直接参考下面的文章。都是可以从 github 上找到源码拿来用的!
flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)
所有的维表性能问题都可以总结为:高 qps 下访问维表存储引擎产生的任务背压,数据产出延迟问题。
举个例子:
0.1 ms
,那么并行度为 1
的任务的吞吐可以达到 1 query / 0.1 ms = 1w qps
。2 ms
,那么一条数据从输入 flink 任务到输出 flink 任务的时延就会变成 2.1 ms
,那么同样并行度为 1
的任务的吞吐只能达到 1 query / 2.1 ms = 476 qps
。两者的吞吐量相差 21 倍
。这就是为什么维表 join 的算子会产生背压,任务产出会延迟。
那么当然,解决方案也是有很多的。抛开 flink sql 想一下,如果我们使用 DataStream api,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:
10 / 2.1 ms = 4761 qps
。50 query / 7 ms = 7143 qps
。博主这里测试了下使用 redis pipeline 和未使用的时长消耗对比。如下图所示。redis pipeline
博主认为上述优化效果中,最好用的是 1 + 3,2 相比 3 还是一条一条发请求,性能会差一些。
既然 DataStream 可以这样做,flink sql 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作
lookup.async
。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/先描述一下大概是个什么东西,具体怎么用。
你只需要在 StreamTableEnvironment 中的 table config 配置上 is.dim.batch.mode
为 true
,sql 不用做任何改动的情况下,flink lookup join 算子会自动优化,优化效果如下:
lookup join 算子的每个 task 上,每攒够 30 条数据
or 每隔五秒(处理时间)
去触发一次批量访问 redis 的请求,使用的是 jedis client 的 pipeline 功能访问 redis server。实测性能有很大提升。
关于这个批量访问机制的优化介绍和使用方式介绍,小伙伴们先别急,下篇文章会详细介绍到。
本文主要介绍了 flink sql lookup join 的使用方式,并介绍了一些经常出现的性能问题以及优化思路,总结如下:
local cache
,异步访问维表
,批量访问维表
三种方式去解决性能问题。批量访问维表
的能力,因此博主自己实现了一套,具体使用方式和原理实现敬请期待下篇文章。