本文主要介绍了 flink sql redis 数据汇表的实现过程。
如果想在本地测试下:
flink.examples.sql._03.source_sink.RedisSinkTest
测试类,然后使用 redis-cli 执行 get a
就可以看到结果了(目前只支持 kv,即 redis set key value
)。如果想直接在集群环境使用:
mvn package -DskipTests=true
打包flink-examples-0.0.1-SNAPSHOT.jar
引入 flink lib 中即可,无需其它设置。目前在实时计算的场景中,熟悉 datastream 的同学在很多场景下都会将结果数据写入到 redis 提供数据服务。
举个例子:
而官方是没有提供 flink sql api 的 redis sink connector 的。如下图,基于 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
1
阿里云 flink 是提供了这个能力的。
https://www.alibabacloud.com/help/zh/faq-detail/118038.htm?spm=a2c63.q38357.a3.16.48fa711fo1gVUd
2
因此本文在介绍怎样自定义一个 sql 数据汇表的同时,实现一个 sql redis sink connector 来给大家使用。
redis 作为数据汇表在 datastream 中的最常用的数据结构有很多,基本上所有的数据结构都有可能使用到。本文实现主要实现 kv 结构,其他结构大家可以拿到源码之后进行自定义实现。也就多加几行代码就完事了。
预期效果就如阿里云的 flink redis,redis set key value
的预期 flink sql:
CREATE TABLE redis_sink_table (
key STRING, -- redis key,第 1 列为 key
`value` STRING -- redis value,第 2 列为 value
) WITH (
'connector' = 'redis', -- 指定 connector 是 redis 类型的
'hostname' = '127.0.0.1', -- redis server ip
'port' = '6379', -- redis server 端口
'write.mode' = 'string' -- 指定使用 redis `set key value`
)
INSERT INTO redis_sink_table
SELECT o.f0 as key, o.f1 as value
FROM leftTable AS o
下面是我在本地跑的结果:
3
首先看下我们的测试输入,f0
恒定为 a
,f1
恒定为 b
,并且每 10ms 写入一次:
4
预期结果是 key 为 a
,value 会为 b
,实际结果也相同,使用 redis-cli 查询下,我删除掉也能在 10ms 后写入,所以查询时可以一直查得到:
5
目前可以从网上搜到的实现、以及可以参考的实现有以下两个:
因此博主在实现时,定了一个基调。
在实现 redis 数据汇表之前,不得不谈谈 flink 数据汇表加载和使用机制。
其实上节已经详细描述了 flink sql 对于 source\sink 的加载机制。
flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)
Factory
7
8
如图 source 和 sink 是通过 FactoryUtil.createTableSource
和 FactoryUtil.createTableSink
创建的
16
所有通过 SPI 的 source\sink\formt 插件都继承自 Factory
。
整体创建 sink 方法的调用链如下图。
10
先看下博主的最终实现。
由于高度复用了 bahir redis connector,所以需要重点实现就只有两个类:
RedisDynamicTableFactory
RedisDynamicTableSink
6
具体流程:
RedisDynamicTableFactory implements DynamicTableSinkFactory
,并且在 resource\META-INF 下创建 SPI 的插件文件redis
RedisDynamicTableFactory#createDynamicTableSink
来创建对应的 source RedisDynamicTableSink
RedisDynamicTableSink implements DynamicTableSink
RedisDynamicTableFactory#getSinkRuntimeProvider
方法,创建具体的维表 UDF RichSinkFunction<T>
,这里直接服用了 bahir redis 中的 RedisSink<IN>
介绍完流程,进入具体实现方案细节:
RedisDynamicTableFactory
主要创建 sink 的逻辑:
public class RedisDynamicTableFactory implements DynamicTableSinkFactory {
...
@Override
public String factoryIdentifier() {
// 标识 redis
return "redis";
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// validate all options
// 所有 option 配置的校验,比如 write.mode 类参数
helper.validate();
// get the validated options
final ReadableConfig options = helper.getOptions();
final RedisWriteOptions redisWriteOptions = RedisOptions.getRedisWriteOptions(options);
TableSchema schema = context.getCatalogTable().getSchema();
/ 创建 RedisDynamicTableSink
return new RedisDynamicTableSink(schema.toPhysicalRowDataType()
, redisWriteOptions);
}
}
resources\META-INF 文件:
11
RedisDynamicTableSource
主要创建 table udf 的逻辑:
public class RedisDynamicTableSink implements DynamicTableSink {
...
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
// 初始化 redis 客户端配置
FlinkJedisConfigBase flinkJedisConfigBase = new FlinkJedisPoolConfig.Builder()
.setHost(this.redisWriteOptions.getHostname())
.setPort(this.redisWriteOptions.getPort())
.build();
RedisMapper<RowData> redisMapper = null;
switch (this.redisWriteOptions.getWriteMode()) {
case "string":
// redis key,value 序列化器
// 从 RowData 转换成 redis 的 key value
redisMapper = new SetRedisMapper();
break;
default:
throw new RuntimeException("其他类型 write mode 请自定义实现");
}
// 创建 SinkFunction,注意!!!这里直接复用了 bahir 的实现
return SinkFunctionProvider.of(new RedisSink<>(
flinkJedisConfigBase
, redisMapper));
}
}
RedisSink
执行写入 redis 的主要流程,这里是 bahir 的实现:
public class RedisRowDataLookupFunction extends TableFunction<RowData> {
...
@Override
public void invoke(IN input) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
// 根据具体的命令执行具体写入 redis 的命令
switch (redisCommand) {
case RPUSH:
this.redisCommandsContainer.rpush(key, value);
break;
case LPUSH:
this.redisCommandsContainer.lpush(key, value);
break;
case SADD:
this.redisCommandsContainer.sadd(key, value);
break;
case SET:
this.redisCommandsContainer.set(key, value);
break;
case PFADD:
this.redisCommandsContainer.pfadd(key, value);
break;
case PUBLISH:
this.redisCommandsContainer.publish(key, value);
break;
case ZADD:
this.redisCommandsContainer.zadd(this.additionalKey, value, key);
break;
case ZREM:
this.redisCommandsContainer.zrem(this.additionalKey, key);
break;
case HSET:
this.redisCommandsContainer.hset(this.additionalKey, key, value);
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
}
}
@Override
public void open(Configuration parameters) throws Exception {
try {
// 初始化 redis 执行器
this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
this.redisCommandsContainer.open();
} catch (Exception e) {
LOG.error("Redis has not been properly initialized: ", e);
throw e;
}
}
}
如图是 bahir redis connector 的实现。
15
博主在实现过程中将能复用的都尽力复用。如图是最终实现目录。
12
可以看到实现非常简单。
其中 redis 客户端及其配置
、redis 命令执行器
和 redis 命令定义器
是直接复用了 bahir redis 的。如果你想要在生产环境中进行使用,可以直接将两部分代码合并,成本很低。
源码公众号后台回复flink sql 知其所以然(三)| sql 自定义 redis 数据汇表获取。
本文主要是针对 flink sql redis 数据汇表进行了扩展以及实现,并且复用 bahir redis connector 的配置,具有良好的扩展性。如果你正好需要这么一个 connector,直接公众号后台回复flink sql 知其所以然(三)| sql 自定义 redis 数据汇表获取源码吧。
当然上述只是 redis 数据汇表一个基础的实现,用于生产环境还有很多方面可以去扩展的。