数据库 Redis

最近更新时间:2025-10-17 18:07:11

我的收藏

介绍

Redis Connector 提供了对 Redis 写入和维表的支持。

使用范围

支持类型
维表、目标表
运行模式
API 种类
SQl Api
写入模式
根据 Command 不同,有不同写入模式。

版本说明

Flink 版本
Jedis
说明
1.11
3.5.2
支持(写入,维表)
1.13
3.5.2
支持(写入,维表)
1.14
3.5.2
支持(写入、维表)
1.16
3.5.2
支持(写入、维表)

DDL 定义

set 命令(字符串键)

-- 第1列为 key,第2列为 value。Redis 命令为 set key value
CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'set', -- set 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>'
);

lpush 命令(列表键)

-- 第1列为 key,第2列为 value。Redis 命令为 lpush key value
CREATE TABLE `redis_lpush_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'lpush', -- lpush 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>'
);


sadd 命令(集合键)

-- 第1列为 key,第2列为 value。Redis 命令为 sadd key value
CREATE TABLE `redis_sadd_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'sadd', -- sadd 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>'
);


hset 命令(哈希键)

-- 第1列为 hash_key,第2列为 hash_value。Redis 命令为 hset key hash_key hash_value。
CREATE TABLE `redis_hset_sink_table` (
`hash_key` STRING,
`hash_value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'hset', -- hset 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>',
'additional-key' = '<key>' -- 哈希键
);


HSET_WITH_KEY 命令(哈希键)

-- 第1列为 key, 第2列为 hash_key,第3列为 hash_value。Redis 命令为 hset key hash_key hash_value。
CREATE TABLE `redis_hset_sink_table` (
`key` STRING,
`hash_key` STRING,
`hash_value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'hset_with_key', -- hset 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>'
);


HMSET 命令

动态模式(默认)

-- 第1列为key,第2列为hash_key,第3列为hash_key对应的hash_value。Redis插入数据的命令为hmset key hash_key hash_value
CREATE TABLE `redis_hmset_sink_table` (
`key` STRING,
`fieldKey` STRING,
`fieldValue` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'hmset', -- hmset 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>'
);

字段模式

CREATE TABLE `mysql_cdc_source_table` (
`id` INT,
`key1` STRING,
`key2` STRING,
`key3` STRING,
`key4` STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
'hostname' = '10.165.16.12', -- 数据库的 IP
'port' = '3306', -- 数据库的访问端口
'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
'password' = 'Password!', -- 数据库访问的密码
'database-name' = 'mysql-cdc', -- 需要同步的数据库
'table-name' = 'source' -- 需要同步的数据表名
);
-- 默认第一个字段的值为map的key,其余字段名字为map内的字段名,值为对应的值。
CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`key1` STRING,
`key2` STRING,
`key3` STRING,
`key4` STRING,
PRIMARY KEY (`key`) NOT ENFORCED -- 没定义主键第一个字段是主键
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'hmset', -- set 命令
'nodes' = '10.165.16.19:6379', -- redis 连接地址
'password' = 'Password!',
'database' = '0',
'schema-syntax' = 'fields'
);
insert into
`redis_set_sink_table`
select
CONCAT(CAST(mysql_cdc_source_table.id AS STRING), ':', 'HID'),
`mysql_cdc_source_table`.`key1`,
`mysql_cdc_source_table`.`key2`,
`mysql_cdc_source_table`.`key3`,
`mysql_cdc_source_table`.`key4`,
from
mysql_cdc_source_table

zadd 命令(有序集合键)

-- 第1列为 hash_key,第2列为 hash_value。zadd key score value。
CREATE TABLE `redis_zadd_sink_table` (
`value` STRING,
`score` DOUBLE
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'zadd', -- zadd 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>',
'additional-key' = '<key>' -- 有序集合键
);


ZADD_WITH_KEY 命令(有序集合键)

-- 第1列为 key, 第2列为 value,第3列为 score。zadd key score value。
CREATE TABLE `redis_zadd_sink_table` (
`key` STRING,
`value` STRING,
`score` DOUBLE
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'zadd_with_key', -- zadd 命令
'nodes' = '<host>:<port>', -- redis 连接地址
'password' = '<password>',
'database' = '<database>',
);

get 命令(维表)

CREATE TABLE `redis_dimension_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis',
'command' = 'get', -- get命令
'nodes' = '<host>:<port>', -- redis连接地址,集群模式多个节点使用'',''分隔。
'lookup.cache.max-rows' = '500', -- 查询缓存中最多缓存的数据条数。
'lookup.cache.ttl' = '10 min', -- 每条记录的最长缓存时间。
'lookup.max-retries' = '5', -- 查询失败,最多重试次数。
-- 'password' = '<password>', -- 可选参数,密码
-- 'database' = '<database>', -- 可选参数,数据库:默认0
-- 'redis-mode' = 'standalone' -- 可选参数,redis 部署模式,默认standalone。(可选:standalone 单机模式,cluster 集群模式)
);

HGETALL 命令(维表)

CREATE TABLE datagen_source_table (
proc_time AS PROCTIME(),
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1', -- 每秒产生的数据条数
'fields.id.kind'='random',
'fields.id.min'='1',
'fields.id.max'='3'
);
CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'set', -- set 命令
'nodes' = '10.165.16.19:6379', -- redis 连接地址
'password' = 'Password!',
'database' = '0'
);
CREATE TABLE `redis_dimension_table` (
`id` STRING, --第一个字段作为join的on条件会作为redis map的名字去查询redis得到map。
`name` STRING, --其它参数作为map中的字段名
`age` String
) WITH (
'connector' = 'redis',
'command' = 'hgetall', -- hgetall 命令
'nodes' = '<host>:<port>', -- redis连接地址,集群模式多个节点使用'',''分隔。
'lookup.cache.max-rows' = '500', -- 查询缓存中最多缓存的数据条数。
'lookup.cache.ttl' = '10 min', -- 每条记录的最长缓存时间。
'lookup.max-retries' = '5', -- 查询失败,最多重试次数。
-- 'password' = '<password>', -- 可选参数,密码
-- 'database' = '<database>', -- 可选参数,数据库:默认0
-- 'redis-mode' = 'standalone' -- 可选参数,redis 部署模式,默认standalone。(可选:standalone 单机模式,cluster 集群模式)
);
-- source:{id:"1",name:"sadasdinrginrdogndlkfgndfg"}
-- 维表: redis名字为1的map{"name":"liming","age":"18"}
-- sink:{key:"1:liming:18","value":"sadasdinrginrdogndlkfgndfg"}
insert into
`redis_set_sink_table`
select
CONCAT(CAST(datagen_source_table.id AS STRING), ':', `redis_dimension_table`.name, ':', `redis_dimension_table`.age) AS `key`,
`datagen_source_table`.`name` AS `value`
from
datagen_source_table
join
`redis_dimension_table` for SYSTEM_TIME AS OF datagen_source_table.proc_time
ON
CAST(datagen_source_table.id AS STRING) = `redis_dimension_table`.id;

WITH 参数

类型
参数
数据类型
是否必填
默认值
说明
通用
connector
String
-
固定值为 redis
command
String
-
操作命令。取值与对应的键类型如下:
SET:字符串键
LPUSH:类别键
SADD:集合键
HSET:哈希键
HSET_WITH_KEY:哈希键
ZADD:有序集合键
ZADD_WITH_KEY:有序集合键
HMSET
HGETALL
nodes
String
-
redis server 连接地址,示例:127.0.0.1:6379。集群架构下多个节点使用','分隔
password
String
redis 密码,默认值为空,不进行权限验证
database
String
0
要操作的数据库的 DB number,默认值0
schema-syntax
String
-
以下命令可以使用:
-fields : HMSET command可以使用字段模式
目标表
ignore-delete
String
false
是否忽略 Retraction 消息
additional-ttl
String
-
Redis Key的过期时间,单位:秒。示例:60,设置为60秒。
支持的命令:
SET
HSET_WITH_KEY
HSET
HMSET
additional-key
String
-
用于指定 hset 和 zadd 的 key。执行 hset 和 zadd 命令时必须设置
维表
lookup.cache.max-rows
String
-
查询缓存中最多缓存的数据条数
lookup.cache.ttl
String
10s
每条记录的最长缓存时间
lookup.max-retries
String
1
查询失败最多重试次数
lookup.cache.caching-missing-key
String
true
是否缓存空结果

代码示例

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);

CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到 Redis
'command' = 'set', -- set 命令
'nodes' = '127.0.0.1:8121', -- redis 连接地址
'password' = '<password>',
'database' = '0'
);
insert into redis_set_sink_table select cast(id as string), name from datagen_source_table;

注意事项

1. 自建 Redis 的集群模式不支持多数据库,集群模式下 database 参数无效。
2. 腾讯云数据库 Redis 的集群架构支持多数据库,可以使用 standalone 模式指定写入云数据库 Redis 集群架构中的非0数据库,例如:
CREATE TABLE `redis_set_sink_table` (
`key` STRING,
`value` STRING
) WITH (
'connector' = 'redis', -- 输出到Redis
'command' = 'set', -- set命令
'nodes' = '<host>:<port>', -- redis连接地址
'password' = '<password>',
'redis-mode' = 'standalone',
'database' = '1'
);