介绍
Redis Connector 提供了对 Redis 写入和维表的支持。
使用范围
支持类型 | 维表、目标表 |
运行模式 | 流 |
API 种类 | SQl Api |
写入模式 | 根据 Command 不同,有不同写入模式。 |
版本说明
Flink 版本 | Jedis | 说明 |
1.11 | 3.5.2 | 支持(写入,维表) |
1.13 | 3.5.2 | 支持(写入,维表) |
1.14 | 3.5.2 | 支持(写入、维表) |
1.16 | 3.5.2 | 支持(写入、维表) |
DDL 定义
set 命令(字符串键)
-- 第1列为 key,第2列为 value。Redis 命令为 set key valueCREATE TABLE `redis_set_sink_table` (`key` STRING,`value` STRING) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'set', -- set 命令'nodes' = '<host>:<port>', -- redis 连接地址'password' = '<password>','database' = '<database>');
lpush 命令(列表键)
-- 第1列为 key,第2列为 value。Redis 命令为 lpush key valueCREATE TABLE `redis_lpush_sink_table` (`key` STRING,`value` STRING) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'lpush', -- lpush 命令'nodes' = '<host>:<port>', -- redis 连接地址'password' = '<password>','database' = '<database>');
sadd 命令(集合键)
-- 第1列为 key,第2列为 value。Redis 命令为 sadd key valueCREATE TABLE `redis_sadd_sink_table` (`key` STRING,`value` STRING) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'sadd', -- sadd 命令'nodes' = '<host>:<port>', -- redis 连接地址'password' = '<password>','database' = '<database>');
hset 命令(哈希键)
-- 第1列为 hash_key,第2列为 hash_value。Redis 命令为 hset key hash_key hash_value。CREATE TABLE `redis_hset_sink_table` (`hash_key` STRING,`hash_value` STRING) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'hset', -- hset 命令'nodes' = '<host>:<port>', -- redis 连接地址'password' = '<password>','database' = '<database>','additional-key' = '<key>' -- 哈希键);
HSET_WITH_KEY 命令(哈希键)
-- 第1列为 key, 第2列为 hash_key,第3列为 hash_value。Redis 命令为 hset key hash_key hash_value。CREATE TABLE `redis_hset_sink_table` (`key` STRING,`hash_key` STRING,`hash_value` STRING) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'hset_with_key', -- hset 命令'nodes' = '<host>:<port>', -- redis 连接地址'password' = '<password>','database' = '<database>');
HMSET 命令
动态模式(默认)
-- 第1列为key,第2列为hash_key,第3列为hash_key对应的hash_value。Redis插入数据的命令为hmset key hash_key hash_valueCREATE TABLE `redis_hmset_sink_table` (`key` STRING,`fieldKey` STRING,`fieldValue` STRING) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'hmset', -- hmset 命令'nodes' = '<host>:<port>', -- redis 连接地址'password' = '<password>');
字段模式
CREATE TABLE `mysql_cdc_source_table` (`id` INT,`key1` STRING,`key2` STRING,`key3` STRING,`key4` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = '10.165.16.12', -- 数据库的 IP'port' = '3306', -- 数据库的访问端口'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'Password!', -- 数据库访问的密码'database-name' = 'mysql-cdc', -- 需要同步的数据库'table-name' = 'source' -- 需要同步的数据表名);-- 默认第一个字段的值为map的key,其余字段名字为map内的字段名,值为对应的值。CREATE TABLE `redis_set_sink_table` (`key` STRING,`key1` STRING,`key2` STRING,`key3` STRING,`key4` STRING,PRIMARY KEY (`key`) NOT ENFORCED -- 没定义主键第一个字段是主键) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'hmset', -- set 命令'nodes' = '10.165.16.19:6379', -- redis 连接地址'password' = 'Password!','database' = '0','schema-syntax' = 'fields');insert into`redis_set_sink_table`selectCONCAT(CAST(mysql_cdc_source_table.id AS STRING), ':', 'HID'),`mysql_cdc_source_table`.`key1`,`mysql_cdc_source_table`.`key2`,`mysql_cdc_source_table`.`key3`,`mysql_cdc_source_table`.`key4`,frommysql_cdc_source_table
zadd 命令(有序集合键)
-- 第1列为 hash_key,第2列为 hash_value。zadd key score value。CREATE TABLE `redis_zadd_sink_table` (`value` STRING,`score` DOUBLE) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'zadd', -- zadd 命令'nodes' = '<host>:<port>', -- redis 连接地址'password' = '<password>','database' = '<database>','additional-key' = '<key>' -- 有序集合键);
ZADD_WITH_KEY 命令(有序集合键)
-- 第1列为 key, 第2列为 value,第3列为 score。zadd key score value。CREATE TABLE `redis_zadd_sink_table` (`key` STRING,`value` STRING,`score` DOUBLE) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'zadd_with_key', -- zadd 命令'nodes' = '<host>:<port>', -- redis 连接地址'password' = '<password>','database' = '<database>',);
get 命令(维表)
CREATE TABLE `redis_dimension_table` (`key` STRING,`value` STRING) WITH ('connector' = 'redis','command' = 'get', -- get命令'nodes' = '<host>:<port>', -- redis连接地址,集群模式多个节点使用'',''分隔。'lookup.cache.max-rows' = '500', -- 查询缓存中最多缓存的数据条数。'lookup.cache.ttl' = '10 min', -- 每条记录的最长缓存时间。'lookup.max-retries' = '5', -- 查询失败,最多重试次数。-- 'password' = '<password>', -- 可选参数,密码-- 'database' = '<database>', -- 可选参数,数据库:默认0-- 'redis-mode' = 'standalone' -- 可选参数,redis 部署模式,默认standalone。(可选:standalone 单机模式,cluster 集群模式));
HGETALL 命令(维表)
CREATE TABLE datagen_source_table (proc_time AS PROCTIME(),id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1', -- 每秒产生的数据条数'fields.id.kind'='random','fields.id.min'='1','fields.id.max'='3');CREATE TABLE `redis_set_sink_table` (`key` STRING,`value` STRING) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'set', -- set 命令'nodes' = '10.165.16.19:6379', -- redis 连接地址'password' = 'Password!','database' = '0');CREATE TABLE `redis_dimension_table` (`id` STRING, --第一个字段作为join的on条件会作为redis map的名字去查询redis得到map。`name` STRING, --其它参数作为map中的字段名`age` String) WITH ('connector' = 'redis','command' = 'hgetall', -- hgetall 命令'nodes' = '<host>:<port>', -- redis连接地址,集群模式多个节点使用'',''分隔。'lookup.cache.max-rows' = '500', -- 查询缓存中最多缓存的数据条数。'lookup.cache.ttl' = '10 min', -- 每条记录的最长缓存时间。'lookup.max-retries' = '5', -- 查询失败,最多重试次数。-- 'password' = '<password>', -- 可选参数,密码-- 'database' = '<database>', -- 可选参数,数据库:默认0-- 'redis-mode' = 'standalone' -- 可选参数,redis 部署模式,默认standalone。(可选:standalone 单机模式,cluster 集群模式));-- source:{id:"1",name:"sadasdinrginrdogndlkfgndfg"}-- 维表: redis名字为1的map{"name":"liming","age":"18"}-- sink:{key:"1:liming:18","value":"sadasdinrginrdogndlkfgndfg"}insert into`redis_set_sink_table`selectCONCAT(CAST(datagen_source_table.id AS STRING), ':', `redis_dimension_table`.name, ':', `redis_dimension_table`.age) AS `key`,`datagen_source_table`.`name` AS `value`fromdatagen_source_tablejoin`redis_dimension_table` for SYSTEM_TIME AS OF datagen_source_table.proc_timeONCAST(datagen_source_table.id AS STRING) = `redis_dimension_table`.id;
WITH 参数
类型 | 参数 | 数据类型 | 是否必填 | 默认值 | 说明 |
通用 | connector | String | 是 | - | 固定值为 redis |
| command | String | 是 | - | 操作命令。取值与对应的键类型如下: SET:字符串键 LPUSH:类别键 SADD:集合键 HSET:哈希键 HSET_WITH_KEY:哈希键 ZADD:有序集合键 ZADD_WITH_KEY:有序集合键 HMSET HGETALL |
| nodes | String | 是 | - | redis server 连接地址,示例:127.0.0.1:6379。集群架构下多个节点使用','分隔 |
| password | String | 否 | 空 | redis 密码,默认值为空,不进行权限验证 |
| database | String | 否 | 0 | 要操作的数据库的 DB number,默认值0 |
| schema-syntax | String | 否 | - | 以下命令可以使用: -fields : HMSET command可以使用字段模式 |
目标表 | ignore-delete | String | 否 | false | 是否忽略 Retraction 消息 |
| additional-ttl | String | 否 | - | Redis Key的过期时间,单位:秒。示例:60,设置为60秒。 支持的命令: SET HSET_WITH_KEY HSET HMSET |
| additional-key | String | 否 | - | 用于指定 hset 和 zadd 的 key。执行 hset 和 zadd 命令时必须设置 |
维表 | lookup.cache.max-rows | String | 否 | - | 查询缓存中最多缓存的数据条数 |
| lookup.cache.ttl | String | 否 | 10s | 每条记录的最长缓存时间 |
| lookup.max-retries | String | 否 | 1 | 查询失败最多重试次数 |
| lookup.cache.caching-missing-key | String | 否 | true | 是否缓存空结果 |
代码示例
CREATE TABLE datagen_source_table (id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1' -- 每秒产生的数据条数);CREATE TABLE `redis_set_sink_table` (`key` STRING,`value` STRING) WITH ('connector' = 'redis', -- 输出到 Redis'command' = 'set', -- set 命令'nodes' = '127.0.0.1:8121', -- redis 连接地址'password' = '<password>','database' = '0');insert into redis_set_sink_table select cast(id as string), name from datagen_source_table;
注意事项
1. 自建 Redis 的集群模式不支持多数据库,集群模式下 database 参数无效。
2. 腾讯云数据库 Redis 的集群架构支持多数据库,可以使用 standalone 模式指定写入云数据库 Redis 集群架构中的非0数据库,例如:
CREATE TABLE `redis_set_sink_table` (`key` STRING,`value` STRING) WITH ('connector' = 'redis', -- 输出到Redis'command' = 'set', -- set命令'nodes' = '<host>:<port>', -- redis连接地址'password' = '<password>','redis-mode' = 'standalone','database' = '1');