数据库 Doris

最近更新时间:2023-07-27 10:31:21

我的收藏

介绍

Flink Connector Doris 目前支持通过 Flink 将数据写入 Doris,基于 开源版本 实现。

版本说明

Flink 版本
说明
1.11
支持
1.13
支持
1.14
不支持
1.16
支持(默认打开两阶段提交,即 2PC 写)

使用范围

Flink Connector Doris 目前仅支持 Doris sink。支持的 Doris 版本为0.14.0及以上版本,并且要求开启配置 enable_http_server_v2 = true

DDL 定义

注意:
Flink 1.13、Flink 1.16 的 DDL 参数不同,请选择对应的版本使用。

作为数据目的地 Sink(Flink 1.13)

CREATE TABLE doris_sink_table (
id INT,
name VARCHAR
) WITH (
'connector' = 'doris', -- 固定值 'doris'
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP 地址
'table.identifier' = 'test.sales_order', -- Doris 表名 格式:db.tbl
'username' = 'root', -- 访问Doris的用户名,拥有库的写权限
'password' = 'password', -- 访问Doris的密码
'sink.batch.size' = '500', -- 单次写BE的最大行数
'sink.batch.interval' = '1s' -- flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。
);

作为数据目的地 Sink (Flink 1.16)

CREATE TABLE doris_sink_table (
id INT,
name VARCHAR
) WITH (
'connector' = 'doris', -- 固定值 'doris'
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP 地址
'table.identifier' = 'test.sales_order', -- Doris 表名 格式:db.tbl
'username' = 'root', -- 访问Doris的用户名,拥有库的写权限
'password' = 'password' -- 访问Doris的密码
);

-- 注意: 默认打开 2PC 两阶段提交写入

作为 Catalog

CREATE CATALOG doris_catalog WITH (
'type' = 'doris',
'fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP 地址
'username' = 'root', -- 访问Doris的用户名,拥有库的写权限
'password' = 'password', -- 访问Doris的密码
'default-database' = 'default'
)

WITH 参数 (1.13 版本)

Sink

参数
说明
是否必填
备注
connector
源表类型
固定值 doris
fenodes
Doris FE HTTP 地址
-
table.identifier
Doris 表名,格式:db1.tbl1
-
username
访问 Doris 的用户名
-
password
访问 Doris 的密码
-
sink.batch.size
单次写 BE 的最大行数
默认100
sink.max-retries
写 BE 失败之后的重试次数
默认1
sink.batch.interval
flush 间隔时间,超过该时间后异步线程将缓存中数据写入 BE。默认值为1秒,支持时间单位 ms、s、min、h 和 d。设置为0,表示关闭定期写入
默认1s
sink.properties.*
Stream load 的导入 参数。例如 sink.properties.column_separator' = ','
-
sink.enable-2pc
是否采用事务写入
false

Catalog

参数
说明
是否必填
备注
type
-
固定值 doris
fenodes
Doris FE HTTP 地址
-
username
访问 Doris 的用户名
-
password
访问 Doris 的密码
-
default-database
默认的database
-

WITH 参数 (1.16 版本)

Sink

参数
说明
是否必填
备注
connector
源表类型
固定值 doris
fenodes
Doris FE HTTP 地址
-
table.identifier
Doris 表名,格式:db1.tbl1
-
username
访问 Doris 的用户名
-
password
访问 Doris 的密码
-
sink.enable-2pc
是否采用事务写入
true
sink.properties.*
Stream load 的导入 参数。例如 sink.properties.column_separator' = ','
-
更多参数说明请参见 Flink Doris Connector

类型映射

Doris 字段类型
Flink 字段类型
NULL_TYPE
NULL
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
BIGINT
FLOAT
FLOAT
DOUBLE
DOUBLE
TIME
DATE
DATE
DATETIME
TIMESTAMP
CHAR
STRING
LARGEINT
VARCHAR
DECIMAL
DECIMAL
DECIMALV2
HLL
Unsupported datatype

代码示例

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);

CREATE TABLE doris_sink_table (
id INT,
name STRING
) WITH (
'connector' = 'doris', -- 固定值 'doris'
'fenodes' = 'FE_IP:FE_RESFUL_PORT', -- Doris FE HTTP 地址
'table.identifier' = 'test.sales_order', -- Doris 表名 格式:db.tbl
'username' = 'root', -- 访问Doris的用户名,拥有库的写权限
'password' = 'password', -- 访问Doris的密码
'sink.batch.size' = '500', -- 单次写BE的最大行数
'sink.batch.interval' = '1s' -- flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。
);

INSERT INTO doris_sink_table select * from datagen_source_table;
CREATE CATALOG doris_catalog WITH (
'fenodes' = 'FE_IP:FE_RESFUL_PORT', -- Doris FE HTTP 地址
'username' = 'root', -- 访问Doris的用户名,拥有库的写权限
'password' = 'password', -- 访问Doris的密码
'default-database' = 'default'
);

CREATE TABLE datagen_source_table (
id INT,
name STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='1' -- 每秒产生的数据条数
);

INSERT INTO `doris_catalog`.`my_database`.`my_table` SELECT * FROM.datagen_source_table;
MySQL-CDC 对接 Doris 代码示例
--mysql cdc 源表
CREATE TABLE `mysql_cdc_source_table` (
`id` INT NOT NULL,
`name` VARCHAR,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
'hostname' = 'YourHostName', -- 数据库的 IP
'port' = '3306', -- 数据库的访问端口
'username' = 'YourUserName', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
'password' = 'YourPassword', -- 数据库访问的密码
'database-name' = 'YourDatabase', -- 需要同步的数据库
'table-name' = 'YourTable' -- 需要同步的数据表名
);

--写入doris表
CREATE TABLE `print_table` (
`id` INT,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'doris', -- 固定值 'doris'
'fenodes' = 'FE_IP:FE_RESFUL_PORT', -- Doris FE HTTP 地址
'table.identifier' = 'dbName.tableName', -- Doris 表名 格式:db.tbl
'username' = 'YourUserName', -- 访问Doris的用户名,拥有库的写权限
'password' = 'YourPassword', -- 访问Doris的密码
'sink.batch.size' = '500', -- 单次写BE的最大行数
'sink.batch.interval' = '1s' -- flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。
);

insert into print_table
select id,name from mysql_cdc_source_table;

注意事项

Upsert

若需要 Upsert ,则要求 Doris 表必须是 Uniqe 模型或者 Aggregate 模型。建表示例如下:
-- Uniqe 模型建表语句
CREATE TABLE `doris_sink_table` (
`id` int(11),
`name` varchar(32)
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES("replication_num" = "3");

-- Aggregate 模型建表语句
CREATE TABLE `doris_sink_table` (
`id` int(11),
`name` varchar(32) REPLACE DEFAULT '0'
)
AGGREGATE KEY('id')
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES("replication_num" = "3"); -- 注意若 BE 节点不够,会报 `Failed to find enough host in all backends` 错误,可适当减少该值。

用户权限

用户必须拥有对应的库的写权限。
CREATE USER 'test' IDENTIFIED BY 'test_passwd';
GRANT ALL ON test TO test;