看了那么多的技术文,你能明白作者想让你在读完文章后学到什么吗?
大数据羊说的文章会让你明白
flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码
书接上回,上节说到了博主发现由于在 flink sql 中 lookup join 访问外部维表存在的性能问题。
由此诞生了一个想法,以 Redis 维表为例,Redis 支持 pipeline 批量访问模式,因此 flink sql lookup join 能不能按照 DataStream 方式一样,先攒一批数据 ,然后使用 Redis pipeline 批量访问外部存储。博主亲切的将这个功能称为 flink sql batch lookup join,本节就是讲述博主基于 flink 源码对此功能的实现。
废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:
来看看在具体场景下,对应输入值的输出值应该长啥样。
需求指标:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的画像(性别,年龄段)数据。
来一波输入数据:
曝光用户日志流(show_log)数据(数据存储在 kafka 中):
log_id | timestamp | user_id |
---|---|---|
1 | 2021-11-01 00:01:03 | a |
2 | 2021-11-01 00:03:00 | b |
3 | 2021-11-01 00:05:00 | c |
4 | 2021-11-01 00:06:00 | b |
5 | 2021-11-01 00:07:00 | c |
用户画像维表(user_profile)数据(数据存储在 redis 中):
user_id(主键) | age | sex |
---|---|---|
a | 12-18 | 男 |
b | 18-24 | 女 |
c | 18-24 | 男 |
注意:redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。如下图所示:
user_profile redis
预期输出数据如下:
log_id | timestamp | user_id | age | sex |
---|---|---|---|---|
1 | 2021-11-01 00:01:03 | a | 12-18 | 男 |
2 | 2021-11-01 00:03:00 | b | 18-24 | 女 |
3 | 2021-11-01 00:05:00 | c | 18-24 | 男 |
4 | 2021-11-01 00:06:00 | b | 18-24 | 女 |
5 | 2021-11-01 00:07:00 | c | 18-24 | 男 |
batch lookup join sql 代码和原来的 lookup join sql 代码一模一样。如下 sql。
CREATE TABLE show_log (
log_id BIGINT,
`timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
user_id STRING,
proctime AS PROCTIME()
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE user_profile (
user_id STRING,
age STRING,
sex STRING
) WITH (
'connector' = 'redis',
'hostname' = '127.0.0.1',
'port' = '6379',
'format' = 'json',
'lookup.cache.max-rows' = '500',
'lookup.cache.ttl' = '3600',
'lookup.max-retries' = '1'
);
CREATE TABLE sink_table (
log_id BIGINT,
`timestamp` TIMESTAMP(3),
user_id STRING,
proctime TIMESTAMP(3),
age STRING,
sex STRING
) WITH (
'connector' = 'print'
);
-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT
s.log_id as log_id
, s.`timestamp` as `timestamp`
, s.user_id as user_id
, s.proctime as proctime
, u.sex as sex
, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id
可以看到 lookup join 和 batch lookup join 的代码是完全相同的,唯一的不同之处在于,batch lookup join 需要设置 table config 参数,如下图所示:
table config
将原生 lookup join 和 batch lookup join 的效果做个对比:
原生的 lookup join:每输入一条数据,访问外部维表获取到结果输出一条数据,如下图所示。
lookup join
博主实现的 batch lookup join:是每攒够 30 条数据
或者每 5s(防止数据量少的情况下,长时间不输出数据)
就利用 redis pipeline 能力访问外部存储一次。然后批量输出结果,如下图所示。大大提高了吞吐。
batch lookup join
博主将通过下面几个问题去交给大家怎么改源码去实现自己的功能。
在实现 batch lookup join 之前,当然要从原生的 lookup join 的实现开始入手,看看 flink 官方大大是怎么实现的,具体 transformation 如下图所示:
transformation
具体的实现逻辑承载在 org.apache.flink.streaming.api.operators.ProcessOperator
,org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner
中。
LookupJoinRunner 中的数据处理逻辑集中在 processElement
中。
LookupJoinRunner
可以看到上图,LookupJoinRunner 又内嵌了一层 fetcher 来实现具体的 lookup 逻辑。
原始数据 RowData
和 lookup 到的 RowData
的数据合并为 JoinedRowData
结果,然后输出。接下来详细看看 fetcher 和 collector。
transformation fetcher
把这个 fetcher 的代码 copy 出来瞅瞅。
fetcher
fetcher 内嵌了 RedisRowDataLookupFunction
来作为最终访问外部维表的函数。
访问 redis 获取到数据。
RedisRowDataLookupFunction
transformation collector
把这个 collector 的代码 copy 出来瞅瞅。
collector
是不是感觉一个 lookup join 的调用链贼复杂。
因为 batch lookup join 是完全参考 lookup join 去实现的,所以接下来博主介绍一下整体的调用链关系,这就会方便后续设计 batch lookup join 实现方案的时候去确定具体修改哪一部分代码。
调用链
整体的调用逻辑如下:
ProcessOpeartor
把 原始 RowData
传给 LookupJoinRunner
LookupJoinRunner
把 原始 RowData
传给根据 sql 代码生成的 fetcher
fetcher
中把 原始 RowData
传给 RedisRowDataLookupFunction
然后去 lookup 维表,lookup 到的结果数据为 lookup RowData
collector
把 原始 RowData
和 lookup RowData
数据合并为 JoinedRowData
然后输出。还是一样,先看看设计思路最终的结论,batch lookup join 算子调用链设计如下:
batch lookup 调用链
详细说明一下设计思路:
RedisRowDataLookupFunction
的输入需要是 List<原始 RowData>
,输出需要是 List<lookup RowData>
。其中输入数据输入到 RedisRowDataLookupFunction
中后,使用 Redis pipeline 去批量访问外部存储,然后把结果 List<lookup RowData>
输出。RedisRowDataLookupFunction
的输出数据为 List<lookup RowData>
推断出 collector
输入数据格式必然是 List<原始 RowData>
。由于在 lookup join 中 collector
的逻辑就是将 原始 RowData
和 lookup RowData
合并为 JoinedRowData
,将结果输出。因此 collector
这里就是将 List<原始 RowData>
和 List<lookup RowData>
进行遍历合并,一条一条的输出 JoinedRowData
。RedisRowDataLookupFunction
的输入数据是 fetcher
传入的,则推断出 fetcher
输入数据格式必然是 List<原始 RowData>
。fetcher
输入是 List<原始 RowData>
,则 LookupJoinRunner
输出到 fetcher
的数据也需要是 List<原始 RowData>
。但是 ProcessOpeartor
只能传给 LookupJoinRunner
原始 RowData
,因此可以得出我们的每攒 30 条数据
或者每隔 5s
的逻辑就能确定需要在 LookupJoinRunner
中做了。思路有了,那么 batch lookup join 涉及到的改动项也就能确认了。
BatchLookupJoinRunner
:实现攒批逻辑(每攒 30 条数据
或者每隔 5s
),其中攒批的数据放在 ListState 中,以防止丢失,在 table config 中的 is.dim.batch.mode
设置为 true 时使用此 BatchLookupJoinRunner
。fetcher
:将原来输入的 原始 RowData
改为 List<原始 RowData>
。RedisRowDataBatchLookupFunction
:实现将输入的批量数据 List<原始 RowData>
拿到之后使用 redis pipeline 批量访问外部存储,获取到 List<lookup RowData>
结果数据给 collector
。collector
:将原来 lookup join 中的输入 原始 RowData
,lookup RowData
改为 List<原始 RowData>
,List<lookup RowData>
,添加遍历循环 List<原始 RowData>
,List<lookup RowData>
,按顺序合并 List 中的每一项 原始 RowData
,lookup RowData
输出 JoinedRowData
的逻辑。可以看到 is.dim.batch.mode
设置为 true 时,transformation 如下。transformation 中的重点处理逻辑就是 BatchLookupJoinRunner
batch transformation
BatchLookupJoinRunner
sql 生成的 fetcher 代码如下:
fetcher
RedisRowDataBatchLookupFunction 拿到输入的 List 数据,调用 Redis pipeline 批量访问外部存储。
RedisRowDataBatchLookupFunction
sql 生成的 collector 代码如下:
collector
目前上述方案实现的不足之处如下:
每 5s
博主简单实现了下,完全基于数据驱动的每 5s 攒一批,不是基于 onTimer 驱动的。可能会出现来了一条数据之后,5 min 内都没有来数据,则数据就不输出了。is.dim.batch.mode
设置为 true,代码还按照 lookup join 的方式写即可。本文主要介绍了 flink sql batch lookup join 的使用方式,并介绍了其实现思路以及效果,主要内容如下: