前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >「硬刚Doris系列」Doris高级用法

「硬刚Doris系列」Doris高级用法

作者头像
王知无-import_bigdata
发布2022-06-05 10:31:37
1.5K0
发布2022-06-05 10:31:37
举报

1.1 添加Rollup

Rollup 可以理解为 Table 的一个物化索引结构。物化 是因为其数据在物理上独立存储,而 索引 的意思是,Rollup可以调整列顺序以增加前缀索引的命中率,也可以减少key列以增加数据的聚合度。

以下举例说明。

原表table1的Schema如下:

代码语言:javascript
复制
+----------+-------------+------+-------+---------+-------+
| Field   | Type       | Null | Key   | Default | Extra |
+----------+-------------+------+-------+---------+-------+
| siteid   | int(11)     | No   | true | 10     |       |
| citycode | smallint(6) | No   | true | N/A     |       |
| username | varchar(32) | No   | true |         |       |
| pv       | bigint(20) | No   | false | 0       | SUM   |
| uv       | bigint(20) | No   | false | 0       | SUM   |
+----------+-------------+------+-------+---------+-------+

对于 table1 明细数据是 siteid, citycode, username 三者构成一组 key,从而对 pv 字段进行聚合;如果业务方经常有看城市 pv 总量的需求,可以建立一个只有 citycode, pv 的rollup。

代码语言:javascript
复制
ALTER TABLE table1 ADD ROLLUP rollup_city(citycode, pv);

1.2 Broadcast/Shuffle Join

系统默认实现 Join 的方式,是将小表进行条件过滤后,将其广播到大表所在的各个节点上,形成一个内存 Hash 表,然后流式读出大表的数据进行Hash Join。但是如果当小表过滤后的数据量无法放入内存的话,此时 Join 将无法完成,通常的报错应该是首先造成内存超限。

如果遇到上述情况,建议显式指定 Shuffle Join,也被称作 Partitioned Join。即将小表和大表都按照 Join 的 key 进行 Hash,然后进行分布式的 Join。这个对内存的消耗就会分摊到集群的所有计算节点上。

Doris会自动尝试进行 Broadcast Join,如果预估小表过大则会自动切换至 Shuffle Join。注意,如果此时显式指定了 Broadcast Join 也会自动切换至 Shuffle Join。

使用 Broadcast Join(默认):

代码语言:javascript
复制
mysql> select sum(table1.pv) from table1 join table2 where table1.siteid = 2;
+--------------------+
| sum(`table1`.`pv`) |
+--------------------+
|                 10 |
+--------------------+
1 row in set (0.20 sec)

使用 Broadcast Join(显式指定):

代码语言:javascript
复制
mysql> select sum(table1.pv) from table1 join [broadcast] table2 where table1.siteid = 2;
+--------------------+
| sum(`table1`.`pv`) |
+--------------------+
|                 10 |
+--------------------+
1 row in set (0.20 sec)

使用 Shuffle Join:

代码语言:javascript
复制
mysql> select sum(table1.pv) from table1 join [shuffle] table2 where table1.siteid = 2;
+--------------------+
| sum(`table1`.`pv`) |
+--------------------+
|                 10 |
+--------------------+
1 row in set (0.15 sec)

1.3 Colocation Join

1.3.1 名词解释
  • FE:Frontend,Doris 的前端节点。负责元数据管理和请求接入。
  • BE:Backend,Doris 的后端节点。负责查询执行和数据存储。
  • Colocation Group(CG):一个 CG 中会包含一张及以上的 Table。在同一个 Group 内的 Table 有着相同的 Colocation Group Schema,并且有着相同的数据分片分布。
  • Colocation Group Schema(CGS):用于描述一个 CG 中的 Table,和 Colocation 相关的通用 Schema 信息。包括分桶列类型,分桶数以及副本数等。
1.3.2 原理

doris 除了支持Broadcast/Shuffle Join 之外,Colocation Join更是一大特色。Colocation Join 功能,是将一组拥有相同 CGS 的 Table 组成一个 CG。并保证这些 Table 对应的数据分片会落在同一个 BE 节点上。使得当 CG 内的表进行分桶列上的 Join 操作时,可以通过直接进行本地数据 Join,减少数据在节点间的传输耗时。

为了使得 Table 能够有相同的数据分布,同一 CG 内的 Table 必须保证以下属性相同:

  • 分桶列和分桶数 分桶列,即在建表语句中 DISTRIBUTED BY HASH(col1, col2, ...) 中指定的列。分桶列决定了一张表的数据通过哪些列的值进行 Hash 划分到不同的 Tablet 中。同一 CG 内的 Table 必须保证分桶列的类型和数量完全一致,并且桶数一致,才能保证多张表的数据分片能够一一对应的进行分布控制。
  • 副本数 同一个 CG 内所有表的所有分区(Partition)的副本数必须一致。如果不一致,可能出现某一个 Tablet 的某一个副本,在同一个 BE 上没有其他的表分片的副本对应。

同一个 CG 内的表,分区的个数、范围以及分区列的类型不要求一致。

1.3.3 举例说明
代码语言:javascript
复制
CREATE TABLE `tbl1` (
    `k1` date NOT NULL COMMENT "",
    `k2` int(11) NOT NULL COMMENT "",
    `v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
    PARTITION p1 VALUES LESS THAN ('2019-05-31'),
    PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1"
);
代码语言:javascript
复制
CREATE TABLE `tbl2` (
    `k1` datetime NOT NULL COMMENT "",
    `k2` int(11) NOT NULL COMMENT "",
    `v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
    "colocate_with" = "group1"
);

查看查询计划,如果 Colocation Join 生效,则 Hash Join 节点会显示 colocate: true。

代码语言:javascript
复制
DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
+----------------------------------------------------+
| Explain String                                     |
+----------------------------------------------------+
| PLAN FRAGMENT 0                                    |
|  OUTPUT EXPRS:`tbl1`.`k1` |                        |
|   PARTITION: RANDOM                                |
|                                                    |
|   RESULT SINK                                      |
|                                                    |
|   2:HASH JOIN                                      |
|   |  join op: INNER JOIN                           |
|   |  hash predicates:                              |
|   |  colocate: true                                |
|   |    `tbl1`.`k2` = `tbl2`.`k2`                   |
|   |  tuple ids: 0 1                                |
|   |                                                |
|   |----1:OlapScanNode                              |
|   |       TABLE: tbl2                              |
|   |       PREAGGREGATION: OFF. Reason: null        |
|   |       partitions=0/1                           |
|   |       rollup: null                             |
|   |       buckets=0/0                              |
|   |       cardinality=-1                           |
|   |       avgRowSize=0.0                           |
|   |       numNodes=0                               |
|   |       tuple ids: 1                             |
|   |                                                |
|   0:OlapScanNode                                   |
|      TABLE: tbl1                                   |
|      PREAGGREGATION: OFF. Reason: No AggregateInfo |
|      partitions=0/2                                |
|      rollup: null                                  |
|      buckets=0/0                                   |
|      cardinality=-1                                |
|      avgRowSize=0.0                                |
|      numNodes=0                                    |
|      tuple ids: 0                                  |
+----------------------------------------------------+

1.4 动态分区

1.4.1 原理

在某些使用场景下,用户会将表按照天进行分区划分,每天定时执行例行任务,这时需要使用方手动管理分区,否则可能由于使用方没有创建分区导致数据导入失败,这给使用方带来了额外的维护成本。

在实现方式上, FE会启动一个后台线程,根据fe.conf中dynamic_partition_enable 及 dynamic_partition_check_interval_seconds参数决定该线程是否启动以及该线程的调度频率。每次调度时,会在注册表中读取动态分区表的属性,并根据动态分区属性动态添加及删除分区。

1.4.2 举例说明

建表时,可以在 PROPERTIES 中指定以下dynamic_partition属性,表示这个表是一个动态分区表。

代码语言:javascript
复制
CREATE TABLE example_db.dynamic_partition
(
k1 DATE,
k2 INT,
k3 SMALLINT,
v1 VARCHAR(2048),
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
)
ENGINE=olap
DUPLICATE KEY(k1, k2, k3)
PARTITION BY RANGE (k1)
(
PARTITION p20200321 VALUES LESS THAN ("2020-03-22"),
PARTITION p20200322 VALUES LESS THAN ("2020-03-23"),
PARTITION p20200323 VALUES LESS THAN ("2020-03-24"),
PARTITION p20200324 VALUES LESS THAN ("2020-03-25")
)
DISTRIBUTED BY HASH(k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32"
 );

创建一张动态分区表,指定开启动态分区特性,以当天为2020-03-25为例,在每次调度时,会删除分区上界小于 2020-03-22 的分区,为了避免删除非动态创建的分区,动态删除分区只会删除分区名符合动态创建分区规则的分区,例如分区名为a1, 则即使分区范围在待删除的分区范围内,也不会被删除。同时在调度时会提前创建今天以及以后3天(总共4天)的分区(若分区已存在则会忽略),分区名根据指定前缀分别为p20200325 p20200326p20200327 p20200328,每个分区的分桶数量为32。同时会删除 p20200321 的分区。

1.4.3 分区属性参数

dynamic_partition.enable: 是否开启动态分区特性,可指定为 TRUE 或 FALSE。如果不填写,默认为 TRUE。

dynamic_partition.time_unit: 动态分区调度的单位,可指定为 DAY WEEK MONTH,当指定为 DAY时,动态创建的分区名后缀格式为yyyyMMdd,例如20200325。当指定为 WEEK 时,动态创建的分区名后缀格式为yyyy_ww即当前日期属于这一年的第几周,例如 2020-03-25 创建的分区名后缀为 2020_13, 表明目前为2020年第13周。当指定为 MONTH 时,动态创建的分区名后缀格式为 yyyyMM,例如 202003。

dynamic_partition.start: 动态分区的开始时间, 以当天为基准,超过该时间范围的分区将会被删除。如果不填写,则默认为Integer.MIN_VALUE 即 -2147483648。

dynamic_partition.end: 动态分区的结束时间, 以当天为基准,会提前创建N个单位的分区范围。

dynamic_partition.prefix: 动态创建的分区名前缀。

dynamic_partition.buckets: 动态创建的分区所对应的分桶数量。

1.5 支持Bitmap

使用 Roaring Bitmap 数据结构,现场查询时的 IO,CPU,内存,网络资源会显著减少,并且不会随着数据规模线性增加。

代码语言:javascript
复制
CREATE TABLE `pv_bitmap` (
`dt` int,
`page` varchar(10),
`user_id` bitmap bitmap_union
)
AGGREGATE KEY(`dt`, page)
DISTRIBUTED BY HASH(`dt`) BUCKETS 2;
代码语言:javascript
复制
select bitmap_count(bitmap_union(user_id)) from pv_bitmap; 
select bitmap_union_count(user_id) from pv_bitmap;
select bitmap_union_int(id) from pv_bitmap; 
BITMAP_UNION(expr) : 计算两个 Bitmap 的并集,返回值是序列化后的 Bitmap 值 
BITMAP_COUNT(expr) : 计算 Bitmap 的基数值 
BITMAP_UNION_COUNT(expr): 和 BITMAP_COUNT(BITMAP_UNION(expr)) 等价 
BITMAP_UNION_INT(expr) : 和 COUNT(DISTINCT expr) 等价 (仅支持 TINYINT,SMALLINT 和 INT)

1.6 物化视图

物化视图是将预先计算(根据定义好的 SELECT 语句)好的数据集,存储在 Doris 中的一个特殊的表。

物化视图的出现主要是为了满足用户,既能对原始明细数据的任意维度分析,也能快速的对固定维度进行分析查询。

在没有物化视图功能之前,用户一般都是使用 Rollup 功能通过预聚合方式提升查询效率的。但是 Rollup 具有一定的局限性,他不能基于明细模型做预聚合。

物化视图则在覆盖了 Rollup 的功能的同时,还能支持更丰富的聚合函数。所以物化视图其实是 Rollup 的一个超集。

也就是说,之前 ALTER TABLE ADD ROLLUP 语法支持的功能现在均可以通过 CREATE MATERIALIZED VIEW 实现。

代码语言:javascript
复制
create materialized view store_amt as
select store_id, sum(sale_amt) from sales_records group by store_id;
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-05-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.1 添加Rollup
  • 1.2 Broadcast/Shuffle Join
  • 1.3 Colocation Join
  • 1.4 动态分区
  • 1.5 支持Bitmap
  • 1.6 物化视图
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档