前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >HAWQ取代传统数仓实践(十六)——事实表技术之迟到的事实

HAWQ取代传统数仓实践(十六)——事实表技术之迟到的事实

作者头像
用户1148526
发布2018-01-03 16:47:58
1.4K0
发布2018-01-03 16:47:58
举报
文章被收录于专栏:Hadoop数据仓库

一、迟到的事实简介

        数据仓库通常建立于一种理想的假设情况下,这就是数据仓库的度量(事实记录)与度量的环境(维度记录)同时出现在数据仓库中。当同时拥有事实记录和正确的当前维度行时,就能够从容地首先维护维度键,然后在对应的事实表行中使用这些最新的键。然而,各种各样的原因会导致需要ETL系统处理迟到的事实数据。例如,某些线下的业务,数据进入操作型系统的时间会滞后于事务发生的时间。再或者出现某些极端情况,如源数据库系统出现故障,直到恢复后才能补上故障期间产生的数据。

        在销售订单示例中,晚于订单日期进入源数据的销售订单可以看做是一个迟到事实的例子。销售订单数据被装载进其对应的事实表时,装载日期晚于销售订单产生的日期,因此是一个迟到的事实。本例中因为定期装载的是前一天的数据,所以这里的“晚于”指的是事务数据延迟两天及其以上才到达ETL系统。

        必须对标准的ETL过程进行特殊修改以处理迟到的事实。首先,当迟到度量事件出现时,不得不反向搜索维度表历史记录,以确定事务发生时间点的有效的维度代理键,因为当前的维度内容无法匹配输入行的情况。此外,还需要调整后续事实行中的所有半可加度量,例如,由于迟到的事实导致客户当前余额的改变。迟到事实可能还会引起周期快照事实表的数据更新,如果2017年5月的销售订单金额已经计算并存储在month_end_sales_order_fact快照表中,这时一个迟到的5月订单在6月某天被装载,那么2017年5月的快照金额必须因迟到事实而重新计算。

        下面就以销售订单数据仓库为例,说明如何处理迟到的事实。

二、修改数据仓库表结构

        在“HAWQ取代传统数仓实践(十三)——事实表技术之周期快照”中建立的月销售周期快照表,其数据源自已经处理过的销售订单事务事实表。因此为了确定事实表中的一条销售订单记录是否是迟到的,需要把源数据中的登记日期列装载进销售订单事实表。为此在要销售订单事实表上添加登记日期代理键列。为了获取登记日期代理键的值,还要使用维度角色扮演技术添加登记日期维度表。

        执行下面的脚本在销售订单事实表里添加名为entry_date_sk的日期代理键列,并且从日期维度表创建一个叫做v_entry_date_dim的数据库视图。

代码语言:javascript
复制
set search_path=tds;

-- 给销售订单事实表增加登记日期代理键
alter table sales_order_fact add column entry_date_sk int default null;
comment on column sales_order_fact.entry_date_sk is '登记日期代理键';

-- 建立登记日期维度视图
create view v_entry_date_dim 
(entry_date_sk, entry_date, month_name, month, quarter, year)   
as    
select date_sk, date, month_name, month, quarter, year   
  from date_dim;

三、修改定期数据装载函数

        在创建了登记日期维度视图,并给销售订单事实表添加了登记日期代理键列以后,需要修改数据仓库定期装载脚本来装载登记日期。修改后的装载函数如下。注意sales_order源数据表及其对应的过渡表中都已经含有登记日期,只是以前没有将其装载进数据仓库。

代码语言:javascript
复制
create or replace function fn_regular_load ()                    
returns void as                    
$$                    
declare                    
    -- 设置scd的生效时间                  
    v_cur_date date := current_date;                      
    v_pre_date date := current_date - 1;                  
    v_last_load date;                  
begin                  
    -- 分析外部表                  
    analyze ext.customer;                  
    analyze ext.product;                  
    analyze ext.sales_order;                  
                  
    -- 将外部表数据装载到原始数据表                  
    truncate table rds.customer;                    
    truncate table rds.product;                   
                  
    insert into rds.customer select * from ext.customer;                   
    insert into rds.product select * from ext.product;                  
    insert into rds.sales_order             
    select order_number,            
           customer_number,            
           product_code,            
           status_date,            
           entry_date,            
           order_amount,            
           quantity,            
           request_delivery_date,        
           verification_ind,        
           credit_check_flag,        
           new_customer_ind,        
           web_order_flag,    
           order_status            
      from ext.sales_order;                  
                      
    -- 分析rds模式的表                  
    analyze rds.customer;                  
    analyze rds.product;                  
    analyze rds.sales_order;                  
                  
    -- 设置cdc的上限时间                  
    select last_load into v_last_load from rds.cdc_time;                  
    truncate table rds.cdc_time;                  
    insert into rds.cdc_time select v_last_load, v_cur_date;                  
                  
    -- 装载客户维度                  
    insert into tds.customer_dim                  
    (customer_number,                  
     customer_name,                  
     customer_street_address,                  
     shipping_address,                 
     isdelete,                  
     version,                  
     effective_date)                  
    select case flag                   
                when 'D' then a_customer_number                  
                else b_customer_number                  
            end customer_number,                  
           case flag                   
                when 'D' then a_customer_name                  
                else b_customer_name                  
            end customer_name,                  
           case flag                   
                when 'D' then a_customer_street_address                  
                else b_customer_street_address                  
            end customer_street_address,                  
           case flag                   
                when 'D' then a_shipping_address                  
                else b_shipping_address                  
            end shipping_address,                
           case flag                   
                when 'D' then true                  
                else false                  
            end isdelete,                  
           case flag                   
                when 'D' then a_version                  
                when 'I' then 1                  
                else a_version + 1                  
            end v,                  
           v_pre_date                  
      from (select a.customer_number a_customer_number,                  
                   a.customer_name a_customer_name,                  
                   a.customer_street_address a_customer_street_address,                  
                   a.shipping_address a_shipping_address,                  
                   a.version a_version,                  
                   b.customer_number b_customer_number,                  
                   b.customer_name b_customer_name,                  
                   b.customer_street_address b_customer_street_address,                  
                   b.shipping_address b_shipping_address,                  
                   case when a.customer_number is null then 'I'                  
                        when b.customer_number is null then 'D'                  
                        else 'U'                   
                    end flag                  
              from v_customer_dim_latest a                   
              full join rds.customer b on a.customer_number = b.customer_number                   
             where a.customer_number is null -- 新增                  
                or b.customer_number is null -- 删除                  
                or (a.customer_number = b.customer_number                   
                    and not                   
                           (coalesce(a.customer_name,'') = coalesce(b.customer_name,'')                   
                        and coalesce(a.customer_street_address,'') = coalesce(b.customer_street_address,'')                   
                        and coalesce(a.shipping_address,'') = coalesce(b.shipping_address,'')                   
                        ))) t                  
             order by coalesce(a_customer_number, 999999999999), b_customer_number limit 999999999999;                  
               
    -- 装载产品维度                  
    insert into tds.product_dim                  
    (product_code,                  
     product_name,                  
     product_category,                       
     isdelete,                  
     version,                  
     effective_date)                  
    select case flag                   
                when 'D' then a_product_code                  
                else b_product_code                  
            end product_code,                  
           case flag                   
                when 'D' then a_product_name                  
                else b_product_name                  
            end product_name,                  
           case flag                   
                when 'D' then a_product_category                  
                else b_product_category                  
            end product_category,                  
           case flag                   
                when 'D' then true                  
                else false                  
            end isdelete,                  
           case flag                   
                when 'D' then a_version                  
                when 'I' then 1                  
                else a_version + 1                  
            end v,                  
           v_pre_date                  
      from (select a.product_code a_product_code,                  
                   a.product_name a_product_name,                  
                   a.product_category a_product_category,                  
                   a.version a_version,                  
                   b.product_code b_product_code,                  
                   b.product_name b_product_name,                  
                   b.product_category b_product_category,                                 
                   case when a.product_code is null then 'I'                  
                        when b.product_code is null then 'D'                  
                        else 'U'                   
                    end flag                  
              from v_product_dim_latest a                   
              full join rds.product b on a.product_code = b.product_code                   
             where a.product_code is null -- 新增                  
                or b.product_code is null -- 删除                  
                or (a.product_code = b.product_code                   
                    and not                   
                           (a.product_name = b.product_name                   
                        and a.product_category = b.product_category))) t                  
             order by coalesce(a_product_code, 999999999999), b_product_code limit 999999999999;                  
          
    -- 装载新增产品数量无事实事实表  
    insert into tds.product_count_fact  
    select a.product_sk, b.date_sk   
      from tds.product_dim a, tds.date_dim b   
     where a.version = 1  
       and a.effective_date = v_pre_date  
       and a.effective_date = b.date;   
      
    -- 装载销售订单事实表                    
    insert into sales_order_fact                    
    select a.order_number,                    
           customer_sk,                    
           product_sk,         
           e.date_sk,                  
           e.year * 100 + e.month,                       
           order_amount,                
           quantity,            
           f.date_sk,        
           g.sales_order_attribute_sk,      
           h.customer_zip_code_sk,          
           i.shipping_zip_code_sk,    
           a.order_status,
           l.entry_date_sk		   
      from rds.sales_order a,                   
           v_customer_dim_his c,                    
           v_product_dim_his d,                    
           date_dim e,             
           date_dim f,          
           sales_order_attribute_dim g,       
           v_customer_zip_code_dim h,          
           v_shipping_zip_code_dim i,          
           rds.customer j,      
           rds.cdc_time k,
           v_entry_date_dim l		   
     where a.customer_number = c.customer_number                    
       and a.status_date >= c.effective_date                  
       and a.status_date < c.expiry_date                     
       and a.product_code = d.product_code                    
       and a.status_date >= d.effective_date                  
       and a.status_date < d.expiry_date                     
       and date(a.status_date) = e.date              
       and date(a.request_delivery_date) = f.date
       and date(a.entry_date) = l.entry_date  
       and a.verification_ind = g.verification_ind            
       and a.credit_check_flag = g.credit_check_flag            
       and a.new_customer_ind = g.new_customer_ind            
       and a.web_order_flag = g.web_order_flag       
       and a.customer_number = j.customer_number          
       and j.customer_zip_code = h.customer_zip_code      
       and j.shipping_zip_code = i.shipping_zip_code       
       and a.entry_date >= k.last_load and a.entry_date < k.current_load;                                
          
    -- 重载PA客户维度                
    truncate table pa_customer_dim;                  
    insert into pa_customer_dim                  
    select distinct a.*                    
      from customer_dim a,        
           sales_order_fact b,        
           v_customer_zip_code_dim c           
     where c.customer_state = 'pa'         
       and b.customer_zip_code_sk = c.customer_zip_code_sk        
       and a.customer_sk = b.customer_sk;        
           
    -- 分析tds模式的表                  
    analyze customer_dim;                  
    analyze product_dim;                  
    analyze sales_order_fact;       
    analyze pa_customer_dim;          
                  
    -- 更新时间戳表的last_load字段                    
    truncate table rds.cdc_time;                  
    insert into rds.cdc_time select v_cur_date, v_cur_date;                  
                  
end;                    
$$                    
language plpgsql;   

        在装载脚本中使用销售订单过渡表的状态日期字段限定当时的维度代理键。例如,为了获取事务发生时的客户代理键,筛选条件为:

代码语言:javascript
复制
status_date >= v_customer_dim_his.effective_date and status_date < v_customer_dim_his.expiry_date

        之所以可以这样做,原因在于本示例满足以下两个前提条件:在最初源数据库的销售订单表中,status_date存储的是状态发生时的时间;维度的生效时间与过期时间构成一条连续且不重叠的时间轴,任意status_date日期只能落到唯一的生效时间、过期时间区间内。

四、修改装载周期快照事实表的函数

        “HAWQ取代传统数仓实践(十三)——事实表技术之周期快照”中创建的fn_month_sum函数用于装载月销售周期快照事实表。迟到的事实记录会对周期快照中已经生成的月销售汇总数据产生影响,因此必须做适当的修改。

        月销售周期快照表存储的是某月某产品汇总的销售数量和销售金额,表中有年月、产品代理键、销售金额、销售数量四个字段。由于迟到事实的出现,需要将事务事实表中的数据划分为两类:上月的周期快照和更早的周期快照。

        fn_month_sum函数先删除在生成上个月的汇总数据再重新生成,此时上月的迟到数据可以正确汇总。对于上上个月或更早的迟到数据,需要将迟到的数据累加到已有的周期快照上。下面修改fn_month_sum函数,使之能够自动处理任意时间的迟到事实数据。HAWQ不能行级更新或删除数据,因此为了实现所谓的幂等操作,需要标识出迟到事实记录对应的事实表逻辑主键,在重复执行周期快照装载函数时过滤掉已经装载过的迟到数据。

1. 给周期快照事实表增加事务事实表的逻辑主键

代码语言:javascript
复制
alter table month_end_sales_order_fact add order_number bigint default null;

        正常数据(非迟到)对应的order_number字段值为空。

2. 修改周期快照事实表装载函数

代码语言:javascript
复制
create or replace function tds.fn_month_sum(p_year_month int)   
returns void as   
$$  
declare      
    sqlstring varchar(1000);     
begin  
    -- 幂等操作,先删除上月数据  
    sqlstring := 'truncate table month_end_sales_order_fact_1_prt_p' || cast(p_year_month as varchar);  
    execute sqlstring;  
  
    -- 插入上月销售汇总数据  
    insert into month_end_sales_order_fact    
    select t1.year_month,   
           t2.product_sk,   
           coalesce(t2.month_order_amount,0),   
           coalesce(t2.month_order_quantity,0),
           null     
      from (select p_year_month year_month) t1   
      left join (select year_month, product_sk, sum(order_amount) month_order_amount, sum(quantity) month_order_quantity  
                   from sales_order_fact   
                  where year_month = p_year_month and coalesce(order_status,'N') = 'N' 
                  group by year_month,product_sk) t2   
           on t1.year_month = t2.year_month;  
  
    -- 装载迟到的数据  
    insert into month_end_sales_order_fact      
    select year_month, product_sk, order_amount, quantity, order_number     
      from (select t1.year_month,   
                   t1.product_sk,   
                   t1.order_amount,   
                   t1.quantity,
                   t1.order_number
              from sales_order_fact t1, v_entry_date_dim t2     
             where coalesce(t1.entry_date_sk, t1.status_date_sk) = t2.entry_date_sk  
               and t2.year*100 + t2.month = p_year_month
               and t1.year_month < p_year_month
               and coalesce(t1.order_status,'N') = 'N'
               and not exists (select 1 from month_end_sales_order_fact t3 
                                where t1.order_number = t3.order_number)      
             ) t1; 
    
end;  
$$      
language plpgsql;

        说明:

  • t2.year*100 + t2.month = p_year_month and t1.year_month < p_year_month 处理上个月之前的迟到数据;
  • not exists (select 1 from month_end_sales_order_fact t3 where t1.order_number = t3.order_number) 处理尚未装载的迟到数据,用于实现幂等操作。

3. 建立视图进行二次汇总

代码语言:javascript
复制
create view v_month_end_sales_order_fact as
select year_month, product_sk, sum(month_order_amount) month_order_amount, sum(month_order_quantity) month_order_quantity
  from month_end_sales_order_fact
 group by year_month, product_sk;

五、测试

        在执行定期装载前使用下面的语句查询month_end_sales_order_fact表。之后可以对比‘前’(不包含迟到事实)‘后’(包含了迟到事实)的数据,以确认装载的正确性。

代码语言:javascript
复制
select year_month,  
       product_name,  
       month_order_amount amt,  
       month_order_quantity qty  
  from month_end_sales_order_fact a, 
       product_dim b  
 where a.product_sk = b.product_sk 
   and year_month = cast(extract(year from current_date - interval '1 month') * 100 
                  + extract(month from current_date - interval '1 month') as int)  
order by year_month, product_name;

        查询结果如图1所示。

图1

        下一步执行下面的脚本准备销售订单测试数据。此脚本将三个销售订单装载进销售订单源数据,一个是迟到的在month_end_sales_order_fact中已存在的产品,一个是迟到的在month_end_sales_order_fact中不存在的产品,另一个是非迟到的正常产品。这里需要注意,产品维度是SCD2处理的,所以在添加销售订单时,新增订单时间一定要在产品维度的生效与过期时间区间内。

代码语言:javascript
复制
use source;      

-- 迟到已存在 
set @order_date := from_unixtime(unix_timestamp('2017-05-10') + rand() * (unix_timestamp('2017-05-11') - unix_timestamp('2017-05-10')));      
set @request_delivery_date := from_unixtime(unix_timestamp(date_add(@order_date, interval 5 day)) + rand() * 86400);  
set @entry_date := from_unixtime(unix_timestamp('2017-06-07') + rand() * (unix_timestamp('2017-06-08') - unix_timestamp('2017-06-07')));          
set @amount := floor(1000 + rand() * 9000);           
set @quantity := floor(10 + rand() * 90);        
    
insert into source.sales_order values      
  (null, 143, 6, 2, 'y', 'y', 'y', 'y',  @order_date, 'N', @request_delivery_date,      
        @entry_date, @amount, @quantity);    
 
-- 迟到不存在 
set @order_date := from_unixtime(unix_timestamp('2017-05-10') + rand() * (unix_timestamp('2017-05-11') - unix_timestamp('2017-05-10')));      
set @request_delivery_date := from_unixtime(unix_timestamp(date_add(@order_date, interval 5 day)) + rand() * 86400);  
set @entry_date := from_unixtime(unix_timestamp('2017-06-07') + rand() * (unix_timestamp('2017-06-08') - unix_timestamp('2017-06-07')));          
set @amount := floor(1000 + rand() * 9000);           
set @quantity := floor(10 + rand() * 90);        
    
insert into source.sales_order values      
  (null, 144, 6, 3, 'y', 'y', 'y', 'y',  @order_date, 'N', @request_delivery_date,      
        @entry_date, @amount, @quantity);    

-- 非迟到 
set @entry_date := from_unixtime(unix_timestamp('2017-06-07') + rand() * (unix_timestamp('2017-06-08') - unix_timestamp('2017-06-07')));   
set @request_delivery_date := from_unixtime(unix_timestamp(date_add(@entry_date, interval 5 day)) + rand() * 86400);  
set @amount := floor(1000 + rand() * 9000);           
set @quantity := floor(10 + rand() * 90);        
    
insert into source.sales_order values      
  (null, 145, 12, 4, 'y', 'y', 'y', 'y',  @entry_date, 'N', @request_delivery_date,      
        @entry_date, @amount, @quantity);    

commit;

        新增订单数据如图2所示。

图2

        执行定期装载脚本。

代码语言:javascript
复制
~/regular_etl.sh

        现在已经准备好运行修改后的月底快照装载。手工执行下面的命令执行月底销售订单事实表装载函数导入2017年5月的快照。

代码语言:javascript
复制
su - gpadmin -c 'export PGPASSWORD=123456;psql -U dwtest -d dw -h hdp3 -c "set search_path=tds;select fn_month_sum(cast(extract(year from current_date - interval '\''1 month'\'') * 100 + extract(month from current_date - interval '\''1 month'\'') as int))"'

        执行相同的查询获取包含了迟到事实月底销售订单数据。查询结果如图3所示。

代码语言:javascript
复制
select year_month,  
       product_name,  
       month_order_amount amt,  
       month_order_quantity qty  
  from v_month_end_sales_order_fact a, 
       product_dim b  
 where a.product_sk = b.product_sk 
   and year_month = cast(extract(year from current_date - interval '1 month') * 100 
                  + extract(month from current_date - interval '1 month') as int)  
order by year_month, product_name;

图3

        对比‘前’‘后’查询的结果可以看到:

  • 2017年5月Floppy Drive的销售金额已经从52083变为57707,这是由于迟到的产品销售订单增加了5624的销售金额。销售数量也相应的增加了。
  • 2017年5月的LCD Panel(也是迟到的产品)被添加。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017年06月08日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、迟到的事实简介
  • 二、修改数据仓库表结构
  • 三、修改定期数据装载函数
  • 四、修改装载周期快照事实表的函数
    • 1. 给周期快照事实表增加事务事实表的逻辑主键
      • 2. 修改周期快照事实表装载函数
        • 3. 建立视图进行二次汇总
        • 五、测试
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档