数据库 StarRocks

最近更新时间:2023-06-21 15:21:54

我的收藏

介绍

flink-connector-starrocks 基于开源的 starrocks-connector-for-apache-flink v1.2.4 实现,支持通过 Flink 读写 StarRocks。

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持(批数据源、维表、目的表)
1.14
支持(批数据源、维表、目的表)
1.16
不支持

作为数据目的(Sink)

flink-connector-starrocks 作为数据目的,用于导入数据至 StarRocks,相比于 Flink 官方提供的 flink-connector-jdbc,导入性能更佳,flink-connector-starrocks 的内部实现是通过缓存并批量由 Stream Load 导入。支持 cvs、json 两种数据格式。
以下为 MySQL-CDC 数据实时导入 StarRocks 的示例。
CREATE TABLE `mysql_cdc` (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
'hostname' = '9.134.34.15', -- 数据库的 IP
'port' = '3306', -- 数据库的访问端口
'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
'password' = 'xxx', -- 数据库访问的密码
'database-name' = 'test', -- 需要同步的数据库
'table-name' = 'user_behavior' -- 需要同步的数据表名
);

CREATE TABLE `pk_starrocks`(
`user_id` bigint,
`item_id` bigint,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030',
'load-url' = '172.28.28.98:8030',
'database-name' = 'oceanus',
'table-name' = 'pk_user_behavior',
'username' = 'root',
'password' = 'xxx',
'sink.buffer-flush.interval-ms' = '15000',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true', -- 需要设置为 true
-- 'sink.parallelism' = '1',
'sink.max-retries' = '3',
'sink.semantic' = 'exactly-once'
);

INSERT INTO `pk_starrocks` SELECT * FROM `mysql_cdc`;
说明:
StarRocks 表须使用 主键模型,否则源表数据删除时,无法同步到 StarRocks。
可用 StarRocks 的 SMT 工具,同步库表结构 到 StarRocks。

WITH 参数

参数
必填
默认值
数据类型
描述
connector
YES
NONE
String
固定值 starrocks
jdbc-url
YES
NONE
String
fe 节点 query_port: jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port... ,例如 jdbc:mysql://172.28.28.98:9030
load-url
YES
NONE
String
fe 节点 http_port:  fe_ip1:http_port,fe_ip2:http_port.... ,例如 172.28.28.98:8030
database-name
YES
NONE
String
starrocks 数据库名
table-name
YES
NONE
String
starrocks 表名
username
YES
NONE
String
starrocks 用户名
password
YES
NONE
String
starrocks 用户密码
sink.semantic
NO
at-least-once
String
可选值:
at-least-once 
exactly-once ,只在 checkpoint 时写数据。注意此时 sink.buffer-flush.* 相关参数无效
sink.version
NO
AUTO
String
可选值:
V1:使用 stream-load 实现 exactly-once 语义,性能比较低,适用所有的 starrocks 版本
V2:使用 transaction-load 实现 exactly-once 语义,适用 starrocks 2.4 及之后的版本
AUTO:根据 starrocks 是否支持 transaction-load 自动选择
sink.buffer-flush.max-bytes
NO
94371840(90M)
String
批量写入参数,sink.semanticat-least-once 时有效。当 buffer 中数据大小超过设置值后,触发写入 StarRocks。取值范围[64MB, 10GB]
sink.buffer-flush.max-rows
NO
500000
String
批量写入参数,sink.semanticat-least-once时有效。当 buffer 中数据条数超过设置值后,触发写入 StarRocks。取值范围[64,000, 5000,000]
sink.buffer-flush.interval-ms
NO
300000
String
批量写入参数,sink.semanticat-least-once 时有效。间隔固定的周期(毫秒)触发写入 StarRocks。取值范围 [1000, 3600000]
sink.max-retries
NO
3
String
写 StarRocks 的 stream load 请求的重试次数,取值范围 [0, 1000]
sink.parallelism
NO
NULL
String
单独指定 sink 的并行度,不设置则使用全局并行度
sink.connect.timeout-ms
NO
1000
String
连接 load-url 的超时时间(毫秒),取值范围[100, 60000]
sink.label-prefix
NO
NO
String
stream load label 的前缀,合法字符[-_A-Za-z0-9]。关于 lable,参考 stream load 可选参数
sink.properties.format
NO
CSV
String
导入 StarRocks 的数据格式,可选值 CSV 和 JSON,默认为 CSV
sink.properties.column_separator
NO
\\t
String
用于指定 CSV 格式的列分隔符,参考 CSV 适用参数
sink.properties.row_delimiter
NO
\\n
String
用于指定 CSV 格式的行分隔符,参考 CSV 适用参数
sink.properties.strip_outer_array
NO
false
String
用于 JSON 格式,指定是否裁剪最外层的数组结构。取值范围:truefalse。默认值:false。Flink 批量导入 JSON 数据在最外层有一对表示数组结构的中括号 []。因此,需要您指定该参数取值为 true,这样 StarRocks 会剪裁掉外层的中括号 [],并把中括号 [] 里的每个内层数组都作为一行单独的数据导入。如果您指定该参数取值为 false,则 StarRocks 会把整个 JSON 数据文件解析成一个数组,并作为一行数据导入。例如,待导入的 JSON 数据为 [ {"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ],如果指定该参数取值为 true,则 StarRocks 会把 {"category" : 1, "author" : 2}{"category" : 3, "author" : 4} 解析成两行数据,并导入到目标 StarRocks 表中对应的数据行。参考 JSON 适用参数
sink.properties.*
NO
NONE
String
stream load 属性,例如 'sink.properties.columns' = 'k1, v1。自 StarRocks 2.4 开始,主键模型 支持部分列更新。更多参数请参考 stream load 文档
说明:
sink.semanticat-least-once时,sink.buffer-flush.max-bytes、sink.buffer-flush.max-rows、sink.buffer-flush.interval-ms 任意条件满足时触发 StarRocks 写操作。
sink.semanticexactly-once时,依赖 flink 的 checkpoint,在每次 checkpoint 时保存批数据以及其 label,在checkpoint 完成后的第一次 invoke 中阻塞 flush 所有缓存在 state 当中的数据,以此达到精准一次。此时 sink.buffer-flush.* 相关参数无效。

类型映射(Sink)

Flink type
StarRocks type
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INTEGER
INTEGER
BIGINT
BIGINT
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL
DECIMAL
BINARY
INT
CHAR
STRING
VARCHAR
STRING
STRING
STRING
DATE
DATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)
DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)
DATETIME
ARRAY<T>
ARRAY<T>
MAP<KT,VT>
JSON STRING
ROW<arg T...>
JSON STRING
注意:当前不支持 Flink 的 BYTES、VARBINARY、TIME、INTERVAL、MULTISET、RAW,具体可参考 Flink 数据类型

注意事项

StarRocks 数据模型

StarRocks 支持四种 数据模型:明细模型、聚合模型、更新模型、主键模型。主键模型支持谓词和索引下推,能够在支持实时和频繁更新等场景的同时,提供高效查询。如果没有特殊需求,推荐使用主键模型。
对于 Upsert 流,需要使用主键模型,否则 DELETE 消息无法写入 StarRocks。
相对于 Merge-On-Read 策略的更新模型,主键模型的查询性能能够提升 3~10 倍。
主键模型可利用部分列更新实现多流 JOIN。

作为数据源(Source)

WITH 参数

参数
必填
默认值
数据类型
描述
connector
YES
NONE
String
固定值 starrocks
scan-url
YES
NONE
String
fe 节点 http_port: fe_ip1:http_port,fe_ip2:http_port....,例如 172.28.28.98:8030
jdbc-url
YES
NONE
String
fe 节点 query_port: jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port...,例如 jdbc:mysql://172.28.28.98:9030
username
YES
NONE
String
StarRocks 用户名
password
YES
NONE
String
StarRocks 用户密码
database-name
YES
NONE
String
StarRocks 数据库名
table-name
YES
NONE
String
StarRocks 表名
scan.connect.timeout-ms
NO
1000
String
网络连接超时时间(毫秒)
scan.params.keep-alive-min
NO
10
String
最大 keep alive 时间(分钟)
scan.params.query-timeout-s
NO
600(5min)
String
单次查询超时时间(秒)
scan.params.mem-limit-byte
NO
102410241024(1G)
String
单次查询内存限制
scan.max-retries
NO
1
String
重试次数
lookup.cache.ttl-ms
NO
5000
Long
维表查询的 cache 超时时间

批数据源

CREATE TABLE `starrocks` (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'starrocks' ,
'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port
'scan-url' = '172.28.28.98:8030', -- http_port
'database-name' = 'oceanus',
'table-name' = 'pk_user_behavior',
'username' = 'root',
'password' = 'xxx'
);

CREATE TABLE `print_sink` (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'logger'
);

INSERT INTO `print_sink`
SELECT * FROM starrocks;

维表

CREATE TABLE `starrocks` (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'starrocks' ,
'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port
'scan-url' = '172.28.28.98:8030', -- http_port
'database-name' = 'oceanus',
'table-name' = 'pk_user_behavior',
'username' = 'root',
'password' = 'xxx'
);

CREATE TABLE `datagen` (
`user_id` BIGINT,
`ts` as PROCTIME(),
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '20'
);

CREATE TABLE `print_sink` (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'logger'
);

INSERT INTO `print_sink`
SELECT a.user_id,b.item_id,b.behavior,a.ts
FROM `datagen` a LEFT JOIN `starrocks` FOR SYSTEM_TIME AS OF a.ts as b
ON a.user_id = b.user_id;

类型映射(source)

StarRocks
Flink
NULL
NULL
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
BIGINT
LARGEINT
STRING
FLOAT
FLOAT
DOUBLE
DOUBLE
DATE
DATE
DATETIME
TIMESTAMP
DECIMAL
DECIMAL
DECIMALV2
DECIMAL
DECIMAL32
DECIMAL
DECIMAL64
DECIMAL
DECIMAL128
DECIMAL
CHAR
CHAR
VARCHAR
STRING