首页
学习
活动
专区
圈层
工具
发布
25 篇文章
1
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)
2
基于Hadoop生态圈的数据仓库实践 —— ETL(二)
3
基于Hadoop生态圈的数据仓库实践 —— ETL(一)
4
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(三)
5
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(二)
6
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(一)
7
基于Hadoop生态圈的数据仓库实践 —— 概述(二)
8
基于Hadoop生态圈的数据仓库实践 —— 概述(一)
9
基于Hadoop生态圈的数据仓库实践 —— 进阶技术
10
基于Hadoop生态圈的数据仓库实践 —— 进阶技术
11
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(二)
12
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(一)
13
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(六)
14
基于Hadoop生态圈的数据仓库实践 —— ETL(三)
15
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十三)
16
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十二)
17
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十一)
18
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十七)
19
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十六)
20
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十五)
21
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十)
22
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十四)
23
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(九)
24
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(八)
25
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(七)

基于hadoop生态圈的数据仓库实践 —— 进阶技术(十五)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://cloud.tencent.com/developer/article/1433131

十五、维度合并

代码语言:txt
复制
    随着数据仓库中维度的增加,我们会发现有些通用的数据存在于多个维度中。例如,客户维度的客户地址相关信息、送货地址相关信息和工厂维度里都有邮编、城市和州。本节说明如何把三个维度里的邮编相关信息合并到一个新的维度。

1. 修改数据仓库模式

代码语言:txt
复制
    为了合并维度,需要改变数据仓库模式。下图显示了修改后的模式。新增了一个zip\_code\_dim表,sales\_order\_fact和production\_fact表的结构也做了相应的修改。注意图中只显示了与邮编维度相关的表。
代码语言:txt
复制
    zip\_code\_dim表与两个事实表相关联。这些关系替换了这两个事实表与客户维度、工厂维度的关系。sales\_order\_fact表需要两个关系,一个关联到客户地址邮编,另一个关联到送货地址邮编。与production\_fact表只有一个关系,所以在这个事实表里只增加了工厂地址邮编代理键。
代码语言:txt
复制
    下面的脚本用于修改数据仓库模式,所做的修改如下。
  • 创建邮编维度表zip_code_dim。
  • 初始装载邮编相关数据
  • 基于zip_code_dim表创建customer_zip_code_dim和shipping_zip_code_dim视图。
  • 在sales_order_fact表上增加customer_zip_code_sk和shipping_zip_code_sk列。
  • 基于已有的客户邮编和送货邮编初始装载两个邮编代理键
  • 在customer_dim表上删除客户和送货邮编及其它们的城市和州列。
  • 在pa_customer_dim上删除客户的城市、州和邮编列。
  • 基于zip_code_dim表创建factory_zip_code_dim视图。
  • 给production_fact表增加factory_zip_code_sk列。
  • 从现有的工厂邮编装载factory_zip_code_sk值。
  • 在factory_dim表上删除工厂编码及其它们的城市和州列。
代码语言:javascript
复制
use dw;  

-- 建立地址维度表  
create table zip_code_dim (  
    zip_code_sk int,  
    zip_code int,  
    city varchar(30),  
    state varchar(2),  
    version int,  
    effective_date date,  
    expiry_date date  
)
clustered by (zip_code_sk) into 8 buckets        
stored as orc tblproperties ('transactional'='true'); 
  
-- 初始装载邮编相关数据  
insert into zip_code_dim values (1,17050,'pittsburgh','PA',1,'1900-01-01','2200-01-01');  
insert into zip_code_dim values (2,17051,'mc veytown','PA',1,'1900-01-01','2200-01-01');  
insert into zip_code_dim values (3,17052,'mapleton depot','PA',1,'1900-01-01','2200-01-01');  
insert into zip_code_dim values (4,17053,'marysville','PA',1,'1900-01-01','2200-01-01');  
insert into zip_code_dim values (5,17054,'mattawana','PA',1,'1900-01-01','2200-01-01');  
insert into zip_code_dim values (6,17055,'mechanicsburg','PA',1,'1900-01-01','2200-01-01');  
insert into zip_code_dim values (7,44102,'cleveland','OH',1,'1900-01-01','2200-01-01');  
  
-- 创建视图  
create view customer_zip_code_dim (customer_zip_code_sk , customer_zip_code , customer_city , customer_state , version , effective_date , expiry_date) as  
    select   
        zip_code_sk,  
        zip_code,  
        city,  
        state,  
        version,  
        effective_date,  
        expiry_date  
    from  
        zip_code_dim;  
  
create view shipping_zip_code_dim (shipping_zip_code_sk , shipping_zip_code , shipping_city , shipping_state , version , effective_date , expiry_date) as  
    select   
        zip_code_sk,  
        zip_code,  
        city,  
        state,  
        version,  
        effective_date,  
        expiry_date  
    from  
        zip_code_dim;  
  
-- 添加邮编代理键 
alter table sales_order_fact rename to sales_order_fact_old;
create table sales_order_fact(                                                    
   order_number int COMMENT 'order number',                                         
   customer_sk int COMMENT 'customer surrogate key',
   customer_zip_code_sk int COMMENT 'customer zip code sk',
   shipping_zip_code_sk int COMMENT 'shipping zip code sk',
   product_sk int COMMENT 'product surrogate key',     
   sales_order_attribute_sk int COMMENT 'sales order attribute surrogate key',    
   order_date_sk int COMMENT 'order date surrogate key',     
   entry_date_sk int COMMENT 'entry date surrogate key',   
   allocate_date_sk int COMMENT 'allocate date surrogate key',                         
   allocate_quantity int COMMENT 'allocate quantity',                                  
   packing_date_sk int COMMENT 'packing date surrogate key',                           
   packing_quantity int COMMENT 'packing quantity',                                    
   ship_date_sk int COMMENT 'ship date surrogate key',                                 
   ship_quantity int COMMENT 'ship quantity',                                          
   receive_date_sk int COMMENT 'receive date surrogate key',                           
   receive_quantity int COMMENT 'receive quantity',                                    
   request_delivery_date_sk int COMMENT 'request delivery date surrogate key',         
   order_amount decimal(10,2) COMMENT 'order amount',                                  
   order_quantity int COMMENT 'order quantity')      
clustered by (order_number) into 8 buckets        
stored as orc tblproperties ('transactional'='true');   
insert into sales_order_fact  
select order_number,    
       customer_sk, 
       null,
       null,
       product_sk,  
       sales_order_attribute_sk,  
       order_date_sk,  
       entry_date_sk,  
       allocate_date_sk,              
       allocate_quantity,                   
       packing_date_sk,                     
       packing_quantity,                           
       ship_date_sk,                         
       ship_quantity,                                  
       receive_date_sk,                
       receive_quantity,                 
       request_delivery_date_sk,  
       order_amount,                             
       order_quantity  
  from sales_order_fact_old;  
drop table sales_order_fact_old;  
  
-- 初始装载两个邮编代理键
drop table if exists tmp;
create table tmp as 
select t1.order_number,    
       t1.customer_sk, 
       t2.customer_zip_code_sk,
       t3.shipping_zip_code_sk,
       t1.product_sk,  
       t1.sales_order_attribute_sk,  
       t1.order_date_sk,  
       t1.entry_date_sk,  
       t1.allocate_date_sk,              
       t1.allocate_quantity,                   
       t1.packing_date_sk,                     
       t1.packing_quantity,                           
       t1.ship_date_sk,                         
       t1.ship_quantity,                                  
       t1.receive_date_sk,                
       t1.receive_quantity,                 
       t1.request_delivery_date_sk,  
       t1.order_amount,                             
       t1.order_quantity
  from sales_order_fact t1 
  left join
(select a.order_number order_number,c.customer_zip_code_sk customer_zip_code_sk
  from sales_order_fact a,
       customer_dim b,
       customer_zip_code_dim c
 where a.customer_sk = b.customer_sk
   and b.customer_zip_code = c.customer_zip_code) t2 on t1.order_number = t2.order_number
  left join
(select a.order_number order_number,c.shipping_zip_code_sk shipping_zip_code_sk
  from sales_order_fact a,
       customer_dim b,
       shipping_zip_code_dim c
 where a.customer_sk = b.customer_sk
   and b.shipping_zip_code = c.shipping_zip_code) t3 on t1.order_number = t3.order_number;
delete from sales_order_fact where sales_order_fact.order_number in (select order_number from tmp);
insert into sales_order_fact select * from tmp;

alter table customer_dim rename to customer_dim_old;
create table customer_dim 
(customer_sk int COMMENT 'surrogate key',
 customer_number int COMMENT 'number',
 customer_name varchar(50) COMMENT 'name',
 customer_street_address varchar(50) COMMENT 'address',
 shipping_address varchar(50) COMMENT 'shipping_address',
 version int COMMENT 'version',
 effective_date date COMMENT 'effective date',
 expiry_date date COMMENT 'expiry date')   
clustered by (customer_sk) into 8 buckets        
stored as orc tblproperties ('transactional'='true');
insert into customer_dim
select customer_sk,
       customer_number,
       customer_name,
       customer_street_address,
       shipping_address,
       version,
       effective_date,
       expiry_date
  from customer_dim_old;
drop table customer_dim_old;
 
alter table pa_customer_dim rename to pa_customer_dim_old;
create table pa_customer_dim
(customer_sk int,
 customer_number int,
 customer_name varchar(50),
 customer_street_address varchar(50),
 shipping_address varchar(50),
 version int,
 effective_date date,
 expiry_date date)
clustered by (customer_sk) into 8 buckets        
stored as orc tblproperties ('transactional'='true');
insert into pa_customer_dim
select customer_sk,
       customer_number,
       customer_name,
       customer_street_address,
       shipping_address,
       version,
       effective_date,
       expiry_date
  from pa_customer_dim_old;
drop table pa_customer_dim_old;

-- 创建视图  
create view factory_zip_code_dim (factory_zip_code_sk , factory_zip_code , factory_city , factory_state , version,effective_date , expiry_date) as  
    select   
        zip_code_sk,  
        zip_code,  
        city,  
        state,  
        version,  
        effective_date,  
        expiry_date  
    from  
        zip_code_dim;  
  
alter table production_fact rename to production_fact_old;
create table production_fact
(product_sk int,
 production_date_sk int,
 factory_sk int, 
 factory_zip_code_sk int,
 production_quantity int);
-- 初始装载邮编代理键
insert into production_fact
select a.product_sk,
       a.production_date_sk,
       a.factory_sk,
       c.factory_zip_code_sk,
       a.production_quantity
  from production_fact_old a, 
       factory_dim b,
       factory_zip_code_dim c
 where a.factory_sk = b.factory_sk
   and b.factory_zip_code = c.factory_zip_code;
drop table production_fact_old;   
	   
-- 在factory_dim表上删除工厂编码及其它们的城市和州列  
alter table factory_dim rename to factory_dim_old;
create table factory_dim 
(factory_sk int,
 factory_code int,
 factory_name varchar(30),
 factory_street_address varchar(50),
 version int,
 effective_date date,
 expiry_date date)
clustered by (factory_sk) into 8 buckets        
stored as orc tblproperties ('transactional'='true');
insert into factory_dim 
select factory_sk,
       factory_code,
       factory_name,
       factory_street_address,
       version,
       effective_date,
       expiry_date
  from factory_dim_old;
drop table factory_dim_old;
代码语言:txt
复制
    执行完修改数据仓库模式的脚本后,可以查询customer\_zip\_code\_dim、shipping\_code\_dim、factory\_zip\_code\_dim维度表和sales\_order\_fact、production\_fact事实表,确认邮编已经被成功分离。

2. 修改定期装载脚本

代码语言:txt
复制
    定期装载有三个地方的修改:
  • 删除客户维度装载里所有邮编信息相关的列,因为客户维度里不再有客户邮编和送货邮编相关信息。
  • 在事实表中引用客户邮编视图和送货邮编视图中的代理键。
  • 修改pa_customer_dim装载,因为需要从销售订单事实表的customer_zip_code_sk获取客户邮编。
代码语言:txt
复制
    修改后的regular\_etl.sql脚本如下所示。
代码语言:javascript
复制
-- 设置环境与时间窗口  
!run /root/set_time.sql   
    
-- 装载customer维度    
-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。    
UPDATE customer_dim     
   SET expiry_date = ${hivevar:pre_date}      
 WHERE customer_dim.customer_sk IN      
(SELECT a.customer_sk     
   FROM (SELECT customer_sk,    
                customer_number,    
                customer_street_address,    
                shipping_address                   
           FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN     
                rds.customer b ON a.customer_number = b.customer_number     
          WHERE b.customer_number IS NULL OR     
          (  !(a.customer_street_address <=> b.customer_street_address)    
          OR !(a.shipping_address <=> b.shipping_address)    
          ));     
    
-- 处理customer_street_addresses列上SCD2的新增行      
INSERT INTO customer_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,    
    t1.customer_number,    
    t1.customer_name,    
    t1.customer_street_address,    
    t1.shipping_address,    
    t1.version,    
    t1.effective_date,    
    t1.expiry_date    
FROM      
(      
SELECT      
    t2.customer_number customer_number,    
    t2.customer_name customer_name,    
    t2.customer_street_address customer_street_address,    
    t2.shipping_address shipping_address,    
    t1.version + 1 version,    
    ${hivevar:pre_date} effective_date,      
    ${hivevar:max_date} expiry_date      
 FROM customer_dim t1     
INNER JOIN rds.customer t2      
   ON t1.customer_number = t2.customer_number       
  AND t1.expiry_date = ${hivevar:pre_date}      
 LEFT JOIN customer_dim t3     
   ON t1.customer_number = t3.customer_number     
  AND t3.expiry_date = ${hivevar:max_date}      
WHERE (!(t1.customer_street_address <=> t2.customer_street_address)    
   OR  !(t1.shipping_address <=> t2.shipping_address)    
   )    
  AND t3.customer_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
    
-- 处理customer_name列上的SCD1    
-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update    
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录    
DROP TABLE IF EXISTS tmp;    
CREATE TABLE tmp AS    
SELECT    
    a.customer_sk,    
    a.customer_number,    
    b.customer_name,    
    a.customer_street_address,    
    a.shipping_address,    
    a.version,    
    a.effective_date,    
    a.expiry_date    
  FROM customer_dim a, rds.customer b      
 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);      
DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);      
INSERT INTO customer_dim SELECT * FROM tmp;    
    
-- 处理新增的customer记录     
INSERT INTO customer_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,    
    t1.customer_number,    
    t1.customer_name,    
    t1.customer_street_address,    
    t1.shipping_address,    
    1,    
    ${hivevar:pre_date},    
    ${hivevar:max_date}    
FROM      
(      
SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number      
 WHERE t2.customer_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
    
-- 装载product维度    
-- 设置已删除记录和product_name、product_category列上SCD2的过期    
UPDATE product_dim    
   SET expiry_date = ${hivevar:pre_date}      
 WHERE product_dim.product_sk IN      
(SELECT a.product_sk     
   FROM (SELECT product_sk,product_code,product_name,product_category     
           FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN     
                rds.product b ON a.product_code = b.product_code     
          WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));    
    
-- 处理product_name、product_category列上SCD2的新增行      
INSERT INTO product_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,    
    t1.product_code,    
    t1.product_name,    
    t1.product_category,    
    t1.version,    
    t1.effective_date,    
    t1.expiry_date    
FROM      
(      
SELECT      
    t2.product_code product_code,    
    t2.product_name product_name,    
    t2.product_category product_category,        
    t1.version + 1 version,    
    ${hivevar:pre_date} effective_date,      
    ${hivevar:max_date} expiry_date      
 FROM product_dim t1     
INNER JOIN rds.product t2      
   ON t1.product_code = t2.product_code      
  AND t1.expiry_date = ${hivevar:pre_date}      
 LEFT JOIN product_dim t3     
   ON t1.product_code = t3.product_code     
  AND t3.expiry_date = ${hivevar:max_date}      
WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    
    
-- 处理新增的product记录    
INSERT INTO product_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,    
    t1.product_code,    
    t1.product_name,    
    t1.product_category,    
    1,    
    ${hivevar:pre_date},    
    ${hivevar:max_date}    
FROM      
(      
SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code      
 WHERE t2.product_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    

-- 装载product_count_fact表
insert overwrite table product_count_fact 
select product_sk,date_sk
  from (select a.product_sk product_sk,
               a.product_code product_code,
               b.date_sk date_sk,
               row_number() over (partition by a.product_code order by b.date_sk) rn
          from product_dim a,date_dim b
         where a.effective_date = b.date) t
 where rn = 1;
 
-- 装载销售订单事实表 
-- 前一天新增的销售订单   
INSERT INTO sales_order_fact    
SELECT    
    a.order_number,    
    customer_sk,
    i.customer_zip_code_sk,  
    j.shipping_zip_code_sk,    
    product_sk, 
    g.sales_order_attribute_sk,
    e.order_date_sk,
	h.entry_date_sk,
    null,
    null,
    null,
    null,
    null,
    null,
    null,
    null,
    f.request_delivery_date_sk,
    order_amount,    
    quantity    
  FROM    
    rds.sales_order a,     
    customer_dim c,    
    product_dim d,    
    order_date_dim e,  
    request_delivery_date_dim f, 
    sales_order_attribute_dim g,
    entry_date_dim h,
    customer_zip_code_dim i,  
    shipping_zip_code_dim j,  
    rds.customer k, 
    rds.cdc_time l
 WHERE 
    a.order_status = 'N'
AND a.customer_number = c.customer_number    
AND a.status_date >= c.effective_date    
AND a.status_date < c.expiry_date 
AND a.customer_number = k.customer_number  
AND k.customer_zip_code = i.customer_zip_code  
AND a.status_date >= i.effective_date  
AND a.status_date <= i.expiry_date  
AND k.shipping_zip_code = j.shipping_zip_code  
AND a.status_date >= j.effective_date  
AND a.status_date <= j.expiry_date    
AND a.product_code = d.product_code    
AND a.status_date >= d.effective_date    
AND a.status_date < d.expiry_date    
AND to_date(a.status_date) = e.order_date
AND to_date(a.entry_date) = h.entry_date   
AND to_date(a.request_delivery_date) = f.request_delivery_date
AND a.verification_ind = g.verification_ind  
AND a.credit_check_flag = g.credit_check_flag  
AND a.new_customer_ind = g.new_customer_ind  
AND a.web_order_flag = g.web_order_flag 
AND a.entry_date >= l.last_load AND a.entry_date < l.current_load ;    

-- 重载PA客户维度    
TRUNCATE TABLE pa_customer_dim;      
INSERT INTO pa_customer_dim      
SELECT DISTINCT a.*            
  FROM customer_dim a,
       sales_order_fact b,
       customer_zip_code_dim c	 
 WHERE c.customer_state = 'PA' 
   AND b.customer_zip_code_sk = c.customer_zip_code_sk
   AND a.customer_sk = b.customer_sk;

-- 处理分配库房、打包、配送和收货四个状态
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.customer_zip_code_sk,
       t0.shipping_zip_code_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t0.entry_date_sk entry_date_sk,
       t2.allocate_date_sk allocate_date_sk,
       t1.quantity allocate_quantity,
       t0.packing_date_sk packing_date_sk,
       t0.packing_quantity packing_quantity,
       t0.ship_date_sk ship_date_sk,
       t0.ship_quantity ship_quantity,
       t0.receive_date_sk receive_date_sk,
       t0.receive_quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       allocate_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'A' 
   and to_date(t1.status_date) = t2.allocate_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;

DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.customer_zip_code_sk,
       t0.shipping_zip_code_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t0.entry_date_sk entry_date_sk,
       t0.allocate_date_sk allocate_date_sk,
       t0.allocate_quantity allocate_quantity,
       t2.packing_date_sk packing_date_sk,
       t1.quantity packing_quantity,
       t0.ship_date_sk ship_date_sk,
       t0.ship_quantity ship_quantity,
       t0.receive_date_sk receive_date_sk,
       t0.receive_quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       packing_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'P' 
   and to_date(t1.status_date) = t2.packing_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load; 
   
DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.customer_zip_code_sk,
       t0.shipping_zip_code_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t0.entry_date_sk entry_date_sk,
       t0.allocate_date_sk allocate_date_sk,
       t0.allocate_quantity allocate_quantity,
       t0.packing_date_sk packing_date_sk,
       t0.packing_quantity packing_quantity,
       t2.ship_date_sk ship_date_sk,
       t1.quantity ship_quantity,
       t0.receive_date_sk receive_date_sk,
       t0.receive_quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       ship_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'S' 
   and to_date(t1.status_date) = t2.ship_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
   
DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.customer_zip_code_sk,
       t0.shipping_zip_code_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t0.entry_date_sk entry_date_sk,
       t0.allocate_date_sk allocate_date_sk,
       t0.allocate_quantity allocate_quantity,
       t0.packing_date_sk packing_date_sk,
       t0.packing_quantity packing_quantity,
       t0.ship_date_sk ship_date_sk,
       t0.ship_quantity ship_quantity,
       t2.receive_date_sk receive_date_sk,
       t1.quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       receive_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'R' 
   and to_date(t1.status_date) = t2.receive_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
   
DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

-- 更新时间戳表的last_load字段    
INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;

3. 测试修改后的定期装载

代码语言:txt
复制
    执行修改后的定期装载脚本前,需要做一些准备工作。首先对源数据的客户信息做以下两处修改:
  • 客户编号4的客户和送货邮编从17050改为17055
  • 新增一个编号15的客户
代码语言:txt
复制
    使用下面的语句进行修改:
代码语言:javascript
复制
update source.customer 
   set customer_street_address = '9999 Louise Dr.',
       customer_zip_code = 17055, 
       customer_city = 'Pittsburgh',
       shipping_address = '9999 Louise Dr.',
       shipping_zip_code = 17055,
       shipping_city = 'Pittsburgh'
 where customer_number = 4;

insert into source.customer 
values(15, 'Super Stores', '1000 Woodland St.', 17055, 'Pittsburgh', 'PA', '1000 Woodland St.', 17055, 'Pittsburgh', 'PA');

COMMIT;
代码语言:txt
复制
    现在在装载新的客户数据前查询最后的客户和送货邮编。后面可以用改变后的信息和此查询的输出作对比。查询语句如下。
代码语言:javascript
复制
use dw;
SELECT order_date_sk odsk,
       customer_number cn,
       customer_zip_code czc,
       shipping_zip_code szc
  FROM customer_zip_code_dim a,
       shipping_zip_code_dim b,
       sales_order_fact c,
       customer_dim d
 WHERE a.customer_zip_code_sk = c.customer_zip_code_sk
   AND b.shipping_zip_code_sk = c.shipping_zip_code_sk
   AND d.customer_sk = c.customer_sk;
代码语言:txt
复制
    然后使用下面的语句新增两条销售订单。
代码语言:javascript
复制
SET @order_date := from_unixtime(unix_timestamp('2016-08-08 00:00:01') + rand() * (unix_timestamp('2016-08-08 12:00:00') - unix_timestamp('2016-08-08 00:00:01')));      
SET @amount := floor(1000 + rand() * 9000);     
SET @quantity := floor(10 + rand() * 90);  

INSERT INTO source.sales_order VALUES
  (null, 144, 4, 3, 'Y', 'Y', 'Y', 'N',  @order_date, 'N', '2016-08-10',
        @order_date, @amount, @quantity);
		
SET @order_date := from_unixtime(unix_timestamp('2016-08-08 12:00:00') + rand() * (unix_timestamp('2016-08-09 00:00:00') - unix_timestamp('2016-08-08 12:00:00')));      
SET @amount := floor(1000 + rand() * 9000);     
SET @quantity := floor(10 + rand() * 90);  

INSERT INTO source.sales_order VALUES
  (null, 145, 15, 4, 'Y', 'N', 'Y', 'N', @order_date, 'N', '2016-08-10',
       @order_date, @amount, @quantity);
commit;
代码语言:txt
复制
    使用下面的SQL命令修改时间窗口。
代码语言:javascript
复制
INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-08-08', '2016-08-08' FROM rds.cdc_time;
代码语言:txt
复制
    执行下面的命令定期装载。
代码语言:javascript
复制
./regular_etl.sh
代码语言:txt
复制
    查询customer\_dim表,确认两个改变的客户,即编号4和15的客户,已经正确装载。
代码语言:javascript
复制
select customer_sk csk,
       customer_number cnum,
       customer_name cnam,
       customer_street_address csd,
       shipping_address sd,
       version,
       effective_date,
       expiry_date
  from dw.customer_dim 
 where customer_number in (4, 15);
代码语言:txt
复制
    查询结果如下图所示。
代码语言:txt
复制
    查询sales\_order\_fact表里的两条新销售订单,确认邮编已经正确装载。
代码语言:javascript
复制
use dw;
select a.order_number onum,
       f.customer_number cnum,
       b.customer_zip_code czc,
       c.shipping_zip_code szc,
       g.product_code pc,
       d.order_date od,
       e.entry_date ed,
       a.order_amount,
       a.order_quantity
  from sales_order_fact a,
       customer_zip_code_dim b,
       shipping_zip_code_dim c,
       order_date_dim d,
       entry_date_dim e,
       customer_dim f,
       product_dim g
 where a.order_number IN (144, 145)
   and a.customer_sk = f.customer_sk
   and a.product_sk = g.product_sk
   and a.customer_zip_code_sk = b.customer_zip_code_sk
   and a.shipping_zip_code_sk = c.shipping_zip_code_sk
   and a.order_date_sk = d.order_date_sk
   and a.entry_date_sk = e.entry_date_sk;
代码语言:txt
复制
    查询结果如下图所示。
代码语言:txt
复制
    查询pa\_customer\_dim表,确认PA客户正确装载。
代码语言:javascript
复制
select customer_sk csk,
       customer_number cnum,
       customer_name cnam,
       customer_street_address csa,
       shipping_address sad,
       version,
       effective_date,
       expiry_date
  from dw.pa_customer_dim;
代码语言:txt
复制
    查询结果如下图所示。

4. 修改产品定期装载

代码语言:txt
复制
    类似于对定期数据仓库装载的修改,需要删除工厂维度导入里所有与邮编相关的列,并在产品事实表导入时使用工厂邮编代理键。修改后的regular\_etl\_daily\_production.sql脚本如下所示。
代码语言:javascript
复制
-- 设置环境与时间窗口
!run /root/set_time.sql

-- 工厂信息很少修改,一般不需要保留历史,所以使用SCD1
drop table if exists tmp;
create table tmp as 
select a.factory_sk,
       a.factory_code,
       b.factory_name,
       b.factory_street_address,
       a.version,
       a.effective_date,
       a.expiry_date
  from factory_dim a,rds.factory_master b
 where a.factory_code = b.factory_code and 
     !(a.factory_name <=> b.factory_name 
   and a.factory_street_address <=> b.factory_street_address
   );

delete from factory_dim where factory_dim.factory_sk in (select factory_sk from tmp);
insert into factory_dim select * from tmp;

-- 添加新的工厂信息
INSERT INTO factory_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.factory_code) + t2.sk_max,    
    t1.factory_code,    
    t1.factory_name,    
    t1.factory_street_address,    
    1,    
    ${hivevar:pre_date},    
    ${hivevar:max_date}    
FROM      
(      
SELECT t1.* FROM rds.factory_master t1 LEFT JOIN factory_dim t2 ON t1.factory_code = t2.factory_code      
 WHERE t2.factory_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(factory_sk),0) sk_max FROM factory_dim) t2; 

-- 装载每日产品事实表
INSERT INTO production_fact  
SELECT  
  b.product_sk  
, c.date_sk  
, d.factory_sk  
, e.factory_zip_code_sk
, production_quantity  
FROM  
  rds.daily_production a  
, product_dim b  
, date_dim c  
, factory_dim d
, factory_zip_code_dim e
, rds.factory_master f 
WHERE  
    production_date = ${hivevar:pre_date} 
AND a.product_code = b.product_code  
AND a.production_date >= b.effective_date  
AND a.production_date <= b.expiry_date
AND a.factory_code = f.factory_code  
AND f.factory_zip_code = e.factory_zip_code  
AND a.production_date >= e.effective_date  
AND a.production_date < e.expiry_date   
AND a.production_date = c.date  
AND a.factory_code = d.factory_code ;

5. 测试修改后的产品定期装载

代码语言:txt
复制
    添加一个新的工厂信息。
代码语言:javascript
复制
insert into source.factory_master
values (5,'Fifth Factory','90909 McNicholds Blvd.',17055,'Pittsburgh','PA');
commit;
代码语言:txt
复制
    向daily\_production表里添加三个日常产品记录。
代码语言:javascript
复制
INSERT INTO source.daily_production VALUES
  (1, '2016-08-08', 3, 400 )
, (3, '2016-08-08', 4, 200 )
, (5, '2016-08-08', 5, 100 );
commit;
代码语言:txt
复制
    修改时间窗口。
代码语言:javascript
复制
INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-08-08', '2016-08-08' FROM rds.cdc_time;
代码语言:txt
复制
    执行产品定期装载。
代码语言:javascript
复制
./regular_etl_daily_production.sh
代码语言:txt
复制
    查询factory\_dim,确认导入是正确的。
代码语言:javascript
复制
select factory_sk,
       factory_code,
       factory_name,
       factory_street_address,
       version,
       effective_date,
       expiry_date	   
  from dw.factory_dim;
代码语言:txt
复制
    查询结果如下图所示。
代码语言:txt
复制
    查询production\_fact表确认三个新的日常产品被正确装载。
代码语言:javascript
复制
use dw;
select e.product_code pc,
       b.date,
       c.factory_code fc,
       d.factory_zip_code fzc,
       a.production_quantity qty
  from production_fact a,
       date_dim b,
       factory_dim c,
       factory_zip_code_dim d,
       product_dim e
 where a.product_sk = e.product_sk
   and a.production_date_sk = b.date_sk
   and a.factory_sk = c.factory_sk
   and a.factory_zip_code_sk = d.factory_zip_code_sk;
代码语言:txt
复制
    查询结果如下图所示。
下一篇
举报
领券