版权声明:本文为博主原创文章,未经博主允许不得转载。 https://cloud.tencent.com/developer/article/1433081
本实验将应用OushuDB数据库,为一个销售订单系统建立数据仓库。通过这个简单的示例,讨论如何利用OushuDB提供的特性,在Hadoop上建立数据仓库系统。本篇说明示例的业务场景、数据仓库架构、实验环境、源和目标库的建立过程、测试数据和日期维度的生成。后面陆续进行初始ETL、定期ETL、调度ETL工作流自动执行、OLAP等实验。目的是演示以OushuDB代替传统数据仓库的具体实现过程。
示例的操作型系统是一个销售订单系统,初始时只有产品、客户、销售订单三个表,实体关系图如图1所示。
图1
该场景中的表及其属性都很简单。产品表和客户表属于基本信息表,分别存储产品和客户的信息。产品只有产品编号、产品名称、产品分类三个属性,产品编号是主键,唯一标识一个产品。客户有六个属性,除客户编号和客户名称外,还包含省、市、街道、邮编四个客户所在地区属性。客户编号是主键,唯一标识一个客户。在实际应用中,基本信息表通常由其它后台系统维护。销售订单表有六个属性,订单号是主键,唯一标识一条销售订单记录。产品编号和客户编号是两个外键,分别引用产品表和客户表的主键。另外三个属性是订单时间、登记时间和订单金额。订单时间指的是客户下订单的时间,订单金额属性指的是该笔订单需要花费的金额,这些属性的含义很清楚。订单登记时间表示订单录入的时间,大多数情况下它应该等同于订单时间。如果由于某种情况需要重新录入订单,还要同时记录原始订单的时间和重新录入的时间,或者出现某种问题,订单登记时间滞后于下订单的时间,这两个属性值就会不同。
源系统采用关系模型设计,为了减少表的数量,这个系统只做到了2NF。地区信息依赖于邮编,所以这个模型中存在传递依赖。
使用以下步骤设计数据仓库模型:
示例数据仓库的实体关系图如图2所示。
图2
“架构”是什么?这个问题从来就没有一个准确的答案。在软件行业,一种被普遍接受的架构定义是指系统的一个或多个结构。结构中包括软件的构建(构建是指软件的设计与实现),构建的外部可以看到属性以及它们之间的相互关系。参考此定义,这里把数据仓库架构理解成构成数据仓库的组件及其之间的关系,那么就有了下面的数据仓库架构图。
图3
图中显示的整个数据仓库环境包括操作型系统和数据仓库系统两大部分。操作型系统的数据经过抽取、转换和装载(ETL)过程进入数据仓库系统。这里把ETL过程分成了抽取和转换装载两个部分。抽取过程负责从操作型系统获取数据,该过程一般不做数据聚合和汇总,物理上是将操作型系统的数据全量或增量复制到数据仓库系统的RDS中。转换装载过程将数据进行清洗、过滤、汇总、统一格式化等一系列转换操作,使数据转为适合查询的格式,然后装载进数据仓库系统的TDS中。传统数据仓库的基本模式是用一些过程将操作型系统的数据抽取到文件,然后另一些过程将这些文件转化成MySQL这样的关系数据库的记录。最后,第三部分过程负责把数据导入进数据仓库。本例中的业务数据使用MySQL数据库存储。
硬件环境、软件环境、HDP与OushuDB的安装部署参见“[OushuDB入门(一)——安装篇](https://blog.csdn.net/wzy0623/article/details/79898507)”。表1汇总了各主机的角色。
主机 | IP | 角色 |
---|---|---|
hdp1 | 172.16.1.124 | Segment |
hdp2 | 172.16.1.125 | Master、Segment |
hdp3 | 172.16.1.126 | Standby、Segment |
hdp4 | 172.16.1.127 | Segment、MySQL |
表1
编辑master上的/data/hawq/master/pg\_hba.conf文件,添加dwtest用户,如图4所示。
图4
(1)设置并发连接参数
hawq config -c max_connections -v 100
hawq config -c seg_max_connections -v 1000
hawq config -c max_prepared_transactions -v 200
(2)设置资源参数
hawq config -c hawq_rm_rejectrequest_nseg_limit -v 0.25
hawq config -c hawq_global_rm_type -v none
hawq config -c hawq_rm_memory_limit_perseg -v 8GB
hawq config -c hawq_rm_nvcore_limit_perseg -v 4
(3)设置新执行器
hawq config -c new_executor -v auto
(4)重启OushuDB
hawq restart cluster
(5)查看配置
hawq config -s max_connections
hawq config -s seg_max_connections
hawq config -s max_prepared_transactions
hawq config -s hawq_rm_rejectrequest_nseg_limit
hawq config -s hawq_global_rm_type
hawq config -s hawq_rm_memory_limit_perseg
hawq config -s hawq_rm_nvcore_limit_perseg
hawq config -s new_executor
说明:
(1)用gpadmin连接OushuDB,建立用户dwtest,授予建库权限。
-- 创建用户
create role dwtest with password '123456' login createdb;
-- 授予用户创建外部表的权限
alter role dwtest with createexttable (type='readable',protocol='hdfs') ;
alter role dwtest with createexttable (type='writable',protocol='hdfs') ;
-- 查看用户
\dg
(2)测试登录
psql -U dwtest -d postgres -h hdp2
-- 查看数据库
\l
(1)将缺省的pg_default的资源限制由50%改为20%,同时将过度使用因子设置为5。
alter resource queue pg_default with
(memory_limit_cluster=20%,core_limit_cluster=20%,resource_overcommit_factor=5);
(2)建立一个dwtest用户使用的专用队列,资源限制由80%,同时将过度使用因子设置为2。
create resource queue dwtest_queue with
(parent='pg_root', memory_limit_cluster=80%, core_limit_cluster=80%,resource_overcommit_factor=2);
(3)查看资源队列配置。
select rsqname,
parentoid,
activestats,
memorylimit,
corelimit,
resovercommit,
allocpolicy,
vsegresourcequota,
nvsegupperlimit,
nvseglowerlimit,
nvsegupperlimitperseg,
nvseglowerlimitperseg
from pg_resqueue;
(4)用gpadmin将dwtest用户的资源队列设置为新建的dwtest_queue。
-- 修改用户资源队列
alter role dwtest resource queue dwtest_queue;
-- 查看用户资源队列
select rolname, rsqname from pg_roles, pg_resqueue
where pg_roles.rolresqueue=pg_resqueue.oid;
假设其他用户都使用缺省的pg\_default队列。采用以上定义,工作负载通过资源队列划分如下:
所有资源队列中虚拟段的资源限额均为缺省的256MB,每个segment可以分配32个虚拟段。并且hawq\_rm\_memory\_limit\_perseg的值设置为8GB,是256MB的32倍,每核2GB内存,这种配置防止形成资源碎片。
su - hdfs -c 'hdfs dfs -mkdir -p /data/rds'
su - hdfs -c 'hdfs dfs -chown -R gpadmin:gpadmin /data/rds'
su - hdfs -c 'hdfs dfs -chmod -R 777 /data/rds'
su - hdfs -c 'hdfs dfs -chmod -R 777 /user'
su - hdfs -c 'hdfs dfs -ls /data'
结果如图5所示。
图5
(1)执行下面的SQL语句在MySQL中建立源数据库表。
-- 建立源数据库
drop database if exists source;
create database source;
use source;
-- 建立客户表
create table customer (
customer_number int not null auto_increment primary key comment '客户编号,主键',
customer_name varchar(50) comment '客户名称',
customer_street_address varchar(50) comment '客户住址',
customer_zip_code int comment '邮编',
customer_city varchar(30) comment '所在城市',
customer_state varchar(2) comment '所在省份'
);
-- 建立产品表
create table product (
product_code int not null auto_increment primary key comment '产品编码,主键',
product_name varchar(30) comment '产品名称',
product_category varchar(30) comment '产品类型'
);
-- 建立销售订单表
create table sales_order (
order_number int not null auto_increment primary key comment '订单号,主键',
customer_number int comment '客户编号',
product_code int comment '产品编码',
order_date datetime comment '订单日期',
entry_date datetime comment '登记日期',
order_amount decimal(10 , 2 ) comment '销售金额',
foreign key (customer_number)
references customer (customer_number)
on delete cascade on update cascade,
foreign key (product_code)
references product (product_code)
on delete cascade on update cascade
);
(2)执行下面的SQL语句生成源库测试数据。
use source;
-- 生成客户表测试数据
insert into customer
(customer_name,customer_street_address,customer_zip_code,
customer_city,customer_state)
values
('really large customers', '7500 louise dr.',17050, 'mechanicsburg','pa'),
('small stores', '2500 woodland st.',17055, 'pittsburgh','pa'),
('medium retailers','1111 ritter rd.',17055,'pittsburgh','pa'),
('good companies','9500 scott st.',17050,'mechanicsburg','pa'),
('wonderful shops','3333 rossmoyne rd.',17050,'mechanicsburg','pa'),
('loyal clients','7070 ritter rd.',17055,'pittsburgh','pa'),
('distinguished partners','9999 scott st.',17050,'mechanicsburg','pa');
-- 生成产品表测试数据
insert into product (product_name,product_category)
values
('hard disk drive', 'storage'),
('floppy drive', 'storage'),
('lcd panel', 'monitor');
-- 生成100条销售订单表测试数据
drop procedure if exists generate_sales_order_data;
delimiter //
create procedure generate_sales_order_data()
begin
drop table if exists temp_sales_order_data;
create table temp_sales_order_data as select * from sales_order where 1=0;
set @start_date := unix_timestamp('2017-03-01');
set @end_date := unix_timestamp('2017-07-01');
set @i := 1;
while @i<=100 do
set @customer_number := floor(1 + rand() * 6);
set @product_code := floor(1 + rand() * 2);
set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (@i,@customer_number,@product_code,@order_date,@order_date,@amount);
set @i:=@i+1;
end while;
truncate table sales_order;
insert into sales_order
select null,customer_number,product_code,order_date,entry_date,order_amount from temp_sales_order_data order by order_date;
commit;
end
//
delimiter ;
call generate_sales_order_data();
说明:
创建了一个MySQL存储过程生成100条销售订单测试数据。为了模拟实际订单的情况,订单表中的客户编号、产品编号、订单时间和订单金额都取一个范围内的随机值,订单时间与登记时间相同。因为订单表的主键是自增的,为了保持主键值和订单时间字段的值顺序保持一致,引入了一个名为temp\_sales\_order\_data的表,存储中间临时数据。在后面章节中都是使用此方案生成订单测试数据。
(3)创建读取源数据的用户
create user 'dwtest'@'%' identified by '123456';
grant select on source.* to 'dwtest'@'%';
(1)用dwtest用户连接OushuDB
psql -U dwtest -d postgres -h hdp2
(2)建立数据库dw
create database dw;
(3)在dw库中建立模式rds和tds,结果如图6所示。
-- 连接dw数据库
\c dw
-- 创建rds模式
create schema rds;
-- 创建tds模式
create schema tds;
-- 查看模式
\dn
图6
(4)设置模式查找路径,结果如图7所示。
-- 修改数据库的模式查找路径
alter database dw set search_path to rds, tds;
-- 重新连接dw数据库
\c dw
-- 显示模式查找路径
show search_path;
图7
OushuDB的模式是数据库中对象和数据的逻辑组织。模式允许在一个数据库中有多个同名的对象,如表。如果对象属于不同的模式,同名对象之间不会冲突。使用schema有如下好处:
每个OushuDB会话在任一时刻只能连接一个数据库,因此将RDS和TDS对象存放单独的数据库显然是不合适的。这里在dw库中创建了rds、td两个模式。后面ETL时我们将用Sqoop作为数据抽取工具。Sqoop可以将关系数据库中的数据导入到HDFS或hive,但目前还没有命令行工具可以将MySQL数据直接导入到OushuDB数据库中,所以不得不将缓冲数据存储到HDFS,再利用OushuDB的外部表进行访问。OushuDB支持TXT、CSV和ORC格式的内外表,其中ORC格式的表性能最好。关于OushuDB不同格式表的性能比较,参见“[OushuDB入门(二)——性能篇](https://blog.csdn.net/wzy0623/article/details/80017447)”。我们的目标就是用ORC格式的外表存储RDS层数据,以获得最优的查询性能。Sqoop不能直接将源端的数据存储成HDFS上的ORC文件。幸运的是,Sqoop已经可以将源端数据直接导入Hive的ORC表,sqoop-import 从MySQL抽取数据,装载Hive ORC表,在平均行长50字节的情况下,1千万条数据只用两分钟,速度超预期。基于ORC格式的兼容性,OushuDB可以访问任何ORC格式的HDFS文件,当然可以访问Hive表的ORC文件。通过让OushuDB访问Hive ORC表这种方式,不需要编写额外程序,不需要用文本格式中转,并且能充分利用OushuDB ORC表的查询性能,真正做到只存储一份数据,而使用不同的引擎访问,解决了原始数据装载的问题。
这里使用两个个schema来划源数据存储和多维数据仓库的对象,不但逻辑上非常清晰,而且兼顾了ETL的处理速度。
drop table if exists customer;
-- 建立客户表
create table customer
(
customer_number int,
customer_name varchar(30),
customer_street_address varchar(30),
customer_zip_code int,
customer_city varchar(30),
customer_state varchar(2)
) stored as orc
location 'hdfs://mycluster/data/rds/customer';
drop table if exists product;
-- 建立产品表
create table product
(
product_code int,
product_name varchar(30),
product_category varchar(30)
) stored as orc
location 'hdfs://mycluster/data/rds/product';
drop table if exists sales_order;
-- 建立销售订单表
create table sales_order
(
order_number int,
customer_number int,
product_code int,
order_date timestamp,
entry_date timestamp,
order_amount double
) stored as orc
location 'hdfs://mycluster/data/rds/sales_order';
说明:
-- 设置模式查找路径
set search_path to rds;
-- 建立客户原始数据表
create external table customer
(
customer_number int,
customer_name varchar(30),
customer_street_address varchar(30),
customer_zip_code int,
customer_city varchar(30),
customer_state varchar(2)
)
location ('hdfs://mycluster/data/rds/customer')
format 'orc';
comment on table customer is '客户原始数据表';
comment on column customer.customer_number is '客户编号';
comment on column customer.customer_name is '客户姓名';
comment on column customer.customer_street_address is '客户地址';
comment on column customer.customer_zip_code is '客户邮编';
comment on column customer.customer_city is '客户所在城市';
comment on column customer.customer_state is '客户所在省份';
-- 建立产品原始数据表
create external table product
(
product_code int,
product_name varchar(30),
product_category varchar(30)
)
location ('hdfs://mycluster/data/rds/product')
format 'orc';
comment on table product is '产品原始数据表';
comment on column product.product_code is '产品编码';
comment on column product.product_name is '产品名称';
comment on column product.product_category is '产品类型';
-- 建立销售订单原始数据表
create external table sales_order
(
order_number int,
customer_number int,
product_code int,
order_date timestamp,
entry_date timestamp,
order_amount double precision
)
location ('hdfs://mycluster/data/rds/sales_order')
format 'orc';
comment on table sales_order is '销售订单原始数据表';
comment on column sales_order.order_number is '订单号';
comment on column sales_order.customer_number is '客户编号';
comment on column sales_order.product_code is '产品编码';
comment on column sales_order.order_date is '订单日期';
comment on column sales_order.entry_date is '登记日期';
comment on column sales_order.order_amount is '销售金额';
说明:
-- 设置模式查找路径
set search_path to tds;
-- 建立客户维度表
create table customer_dim (
customer_sk bigserial,
customer_number int,
customer_name varchar(50),
customer_street_address varchar(50),
customer_zip_code int,
customer_city varchar(30),
customer_state varchar(2),
isdelete boolean,
version int,
effective_date date
) format 'orc';
comment on table customer_dim is '客户维度表';
comment on column customer_dim.customer_sk is '客户维度代理键';
comment on column customer_dim.customer_number is '客户编号';
comment on column customer_dim.customer_name is '客户姓名';
comment on column customer_dim.customer_street_address is '客户地址';
comment on column customer_dim.customer_zip_code is '客户邮编';
comment on column customer_dim.customer_city is '客户所在城市';
comment on column customer_dim.customer_state is '客户所在省份';
comment on column customer_dim.isdelete is '是否删除';
comment on column customer_dim.version is '版本';
comment on column customer_dim.effective_date is '生效日期';
-- 建立产品维度表
create table product_dim (
product_sk bigserial,
product_code int,
product_name varchar(30),
product_category varchar(30),
isdelete boolean,
version int,
effective_date date
) format 'orc';
comment on table product_dim is '产品维度表';
comment on column product_dim.product_sk is '产品维度代理键';
comment on column product_dim.product_code is '产品编码';
comment on column product_dim.product_name is '产品名称';
comment on column product_dim.product_category is '产品类型';
comment on column product_dim.isdelete is '是否删除';
comment on column product_dim.version is '版本';
comment on column product_dim.effective_date is '生效日期';
-- 建立订单维度表
create table order_dim (
order_sk bigserial,
order_number int,
isdelete boolean,
version int,
effective_date date
) format 'orc';
comment on table order_dim is '订单维度表';
comment on column order_dim.order_sk is '订单维度代理键';
comment on column order_dim.order_number is '订单号';
comment on column order_dim.isdelete is '是否删除';
comment on column order_dim.version is '版本';
comment on column order_dim.effective_date is '生效日期';
-- 建立日期维度表
create table date_dim (
date_sk bigserial,
date date,
month smallint,
month_name varchar(9),
quarter smallint,
year smallint
) format 'orc';
comment on table date_dim is '日期维度表';
comment on column date_dim.date_sk is '日期维度代理键';
comment on column date_dim.date is '日期';
comment on column date_dim.month is '月份';
comment on column date_dim.month_name is '月份名称';
comment on column date_dim.quarter is '季度';
comment on column date_dim.year is '年份';
-- 建立销售订单事实表
create table sales_order_fact (
order_sk bigint,
customer_sk bigint,
product_sk bigint,
order_date_sk bigint,
year_month int,
order_amount double precision
) format 'orc'
partition by range (year_month)
( partition p201701 start (201701) inclusive ,
partition p201702 start (201702) inclusive ,
partition p201703 start (201703) inclusive ,
partition p201704 start (201704) inclusive ,
partition p201705 start (201705) inclusive ,
partition p201706 start (201706) inclusive ,
partition p201707 start (201707) inclusive ,
partition p201708 start (201708) inclusive ,
partition p201709 start (201709) inclusive ,
partition p201710 start (201710) inclusive ,
partition p201711 start (201711) inclusive ,
partition p201712 start (201712) inclusive ,
partition p201801 start (201801) inclusive ,
partition p201802 start (201802) inclusive ,
partition p201803 start (201803) inclusive ,
partition p201804 start (201804) inclusive ,
partition p201805 start (201805) inclusive ,
partition p201806 start (201806) inclusive ,
partition p201807 start (201807) inclusive ,
partition p201808 start (201808) inclusive ,
partition p201809 start (201809) inclusive ,
partition p201810 start (201810) inclusive ,
partition p201811 start (201811) inclusive ,
partition p201812 start (201812) inclusive
end (201901) exclusive );
comment on table sales_order_fact is '销售订单事实表';
comment on column sales_order_fact.order_sk is '订单维度代理键';
comment on column sales_order_fact.customer_sk is '客户维度代理键';
comment on column sales_order_fact.product_sk is '产品维度代理键';
comment on column sales_order_fact.order_date_sk is '日期维度代理键';
comment on column sales_order_fact.year_month is '年月分区键';
comment on column sales_order_fact.order_amount is '销售金额';
说明:
日期维度在数据仓库中是一个特殊角色。日期维度包含时间概念,而时间是最重要的,因为数据仓库的主要功能之一就是存储历史数据,所以每个数据仓库里的数据都有一个时间特征。本例中创建一个OushuDB的函数,预装载日期数据。注意,ORC表并不支持事务,而是与MySQL MYISAM引擎类似,每行自动提交。
-- 生成日期维度表数据的函数
create or replace function fn_populate_date (start_dt date, end_dt date)
returns void as
$$
declare
v_date date:= start_dt;
v_datediff int:= end_dt - start_dt;
begin
for i in 0 .. v_datediff
loop
insert into date_dim(date, month, month_name, quarter, year)
values(v_date, extract(month from v_date), to_char(v_date,'mon'), extract(quarter from v_date), extract(year from v_date));
v_date := v_date + 1;
end loop;
analyze date_dim;
end;
$$
language plpgsql;
-- 执行函数生成日期维度数据
select fn_populate_date(date '2000-01-01', date '2020-12-31');
-- 查询生成的日期
select min(date_sk) min_sk, min(date) min_date, max(date_sk) max_sk, max(date) max_date, count(*) c from date_dim;
ORC表的单行插入非常慢,本过程生成7千多条记录要5个多小时。所以可选的方式是用程序、MySQL过程或其它工具生成日期维度的文本文件,然后使用OushuDB的COPY命令,瞬间完成向日期维度表的数据加载,如:
copy date_dim from '/home/gpadmin/date_dim.txt' with delimiter '|';