首页
学习
活动
专区
圈层
工具
发布
25 篇文章
1
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)
2
基于Hadoop生态圈的数据仓库实践 —— ETL(二)
3
基于Hadoop生态圈的数据仓库实践 —— ETL(一)
4
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(三)
5
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(二)
6
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(一)
7
基于Hadoop生态圈的数据仓库实践 —— 概述(二)
8
基于Hadoop生态圈的数据仓库实践 —— 概述(一)
9
基于Hadoop生态圈的数据仓库实践 —— 进阶技术
10
基于Hadoop生态圈的数据仓库实践 —— 进阶技术
11
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(二)
12
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(一)
13
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(六)
14
基于Hadoop生态圈的数据仓库实践 —— ETL(三)
15
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十三)
16
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十二)
17
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十一)
18
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十七)
19
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十六)
20
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十五)
21
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十)
22
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十四)
23
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(九)
24
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(八)
25
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(七)

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十)

十、杂项维度 本节讨论杂项维度。简单地说,杂项维度就是一种包含的数据具有很少可能值的维度。例如销售订单,它可能有很多离散数据(yes-no这种类型的值),如

  • verification_ind(如果订单已经被审核,值为yes)
  • credit_check_flag(表示此订单的客户信用状态是否已经检查)
  • new_customer_ind(如果这是新客户的首个订单,值为yes)
  • web_order_flag(表示此订单是否是在线下的订单)

这类数据常被用于增强销售分析,其特点是属性可能很多但每种属性的可能值很少,适合用称为杂项维度的特殊维度类型存储。 1. 新增销售订单属性杂项维度 给现有的数据仓库新增一个销售订单杂项维度,需要新增一个名为sales_order_attribute_dim的维度表。下图显示了增加杂项维度表后的数据仓库模式(这里只显示了和销售订单属性相关的表)。

新的维度表包括四个yes-no列:verification_ind、credit_check_flag、new_customer_ind和web_order_flag。每个列可以有两个可能值中的一个(Y 或 N),因此sales_order_attribute_dim表最多有16(2^4)行。可以预装载这个维度,并且只需装载一次。 注意,如果知道某种组合是不可能出现的,就不需要装载这种组合。执行下面的脚本修改数据库模式。这个脚本做了四项工作:建立sales_order_attribute_dim表,向表中预装载全部16种可能的数据,给销售订单事实表添加杂项维度代理键,给源数据库里的sales_order表增加对应的四个属性列。

代码语言:javascript
复制
USE dw;  
  
-- 建立杂项维度表  
CREATE TABLE sales_order_attribute_dim (  
    sales_order_attribute_sk INT COMMENT 'sales order attribute surrogate key',  
    verification_ind CHAR(1) COMMENT 'verification index, Y or N',  
    credit_check_flag CHAR(1) COMMENT 'credit check flag, Y or N',  
    new_customer_ind CHAR(1) COMMENT 'new customer index, Y or N',  
    web_order_flag CHAR(1) COMMENT 'web order flag, Y or N',  
    version int COMMENT 'version',  
    effective_date DATE COMMENT 'effective date',  
    expiry_date DATE COMMENT 'expiry date'
)
clustered by (sales_order_attribute_sk) into 8 buckets    
stored as orc tblproperties ('transactional'='true');  
  
-- 生成杂项维度数据  
INSERT INTO sales_order_attribute_dim VALUES (1, 'Y', 'N', 'N', 'N', 1,'1900-00-00', '2200-01-01');
INSERT INTO sales_order_attribute_dim VALUES (2, 'Y', 'Y', 'N', 'N', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (3, 'Y', 'Y', 'Y', 'N', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (4, 'Y', 'Y', 'Y', 'Y', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (5, 'Y', 'N', 'Y', 'N', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (6, 'Y', 'N', 'Y', 'Y', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (7, 'Y', 'N', 'N', 'Y', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (8, 'Y', 'Y', 'N', 'Y', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (9, 'N', 'N', 'N', 'N', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (10, 'N', 'Y', 'N', 'N', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (11, 'N', 'Y', 'Y', 'N', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (12, 'N', 'Y', 'Y', 'Y', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (13, 'N', 'N', 'Y', 'N', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (14, 'N', 'N', 'Y', 'Y', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (15, 'N', 'N', 'N', 'Y', 1,'1900-00-00', '2200-01-01');  
INSERT INTO sales_order_attribute_dim VALUES (16, 'N', 'Y', 'N', 'Y', 1,'1900-00-00', '2200-01-01');  

-- 建立杂项维度外键  
alter table sales_order_fact rename to sales_order_fact_old;
create table sales_order_fact(                                                
   order_number int COMMENT 'order number',                                     
   customer_sk int COMMENT 'customer surrogate key',                               
   product_sk int COMMENT 'product surrogate key', 
   sales_order_attribute_sk int COMMENT 'sales order attribute surrogate key',
   order_date_sk int COMMENT 'order date surrogate key',                           
   allocate_date_sk int COMMENT 'allocate date surrogate key',                     
   allocate_quantity int COMMENT 'allocate quantity',                              
   packing_date_sk int COMMENT 'packing date surrogate key',                       
   packing_quantity int COMMENT 'packing quantity',                                
   ship_date_sk int COMMENT 'ship date surrogate key',                             
   ship_quantity int COMMENT 'ship quantity',                                      
   receive_date_sk int COMMENT 'receive date surrogate key',                       
   receive_quantity int COMMENT 'receive quantity',                                
   request_delivery_date_sk int COMMENT 'request delivery date surrogate key',     
   order_amount decimal(10,2) COMMENT 'order amount',                              
   order_quantity int COMMENT 'order quantity')  
clustered by (order_number) into 8 buckets    
stored as orc tblproperties ('transactional'='true');
insert into table sales_order_fact
select order_number,
       customer_sk,
       product_sk,
       null,
       order_date_sk,
       allocate_date_sk,
       allocate_quantity,
       packing_date_sk,
       packing_quantity,
       ship_date_sk,
       ship_quantity,
       receive_date_sk,
       receive_quantity,
       request_delivery_date_sk,
       order_amount,
       order_quantity
  from sales_order_fact_old;
drop table sales_order_fact_old;

-- 给源库的销售订单表增加对应的属性  
USE source;   
ALTER TABLE sales_order  
  ADD verification_ind CHAR (1) AFTER product_code  
, ADD credit_check_flag CHAR (1) AFTER verification_ind  
, ADD new_customer_ind CHAR (1) AFTER credit_check_flag  
, ADD web_order_flag CHAR (1) AFTER new_customer_ind ;  

-- 给销售订单过渡表增加对应的属性 
USE rds;    
ALTER TABLE sales_order ADD COLUMNS 
(
verification_ind CHAR(1) COMMENT 'verification index, Y or N',  
credit_check_flag CHAR(1) COMMENT 'credit check flag, Y or N',  
new_customer_ind CHAR(1) COMMENT 'new customer index, Y or N',  
web_order_flag CHAR(1) COMMENT 'web order flag, Y or N'
) ;

2. 重建Sqoop作业

代码语言:javascript
复制
last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop | grep incremental.last.value | awk '{print $3}'`  
sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop    
sqoop job \
--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \
--create myjob_incremental_import \
-- \
import \
--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
--table sales_order \
--columns "order_number, customer_number, product_code, status_date, entry_date, order_amount, quantity, request_delivery_date, order_status, verification_ind, credit_check_flag, new_customer_ind, web_order_flag" \
--hive-import \
--hive-table rds.sales_order \
--incremental append \
--check-column id \
--last-value $last_value

3. 修改定期装载脚本 由于有了一个新的维度,必须修改定期装载脚本。下面显示了修改后的regular_etl.sql脚本文件内容。

代码语言:javascript
复制
-- 设置变量以支持事务    
set hive.support.concurrency=true;    
set hive.exec.dynamic.partition.mode=nonstrict;    
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;    
set hive.compactor.initiator.on=true;    
set hive.compactor.worker.threads=1;    
    
USE dw;    
      
-- 设置SCD的生效时间和过期时间    
SET hivevar:cur_date = CURRENT_DATE();
SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);    
SET hivevar:max_date = CAST('2200-01-01' AS DATE);    
      
-- 设置CDC的上限时间    
INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;    
    
-- 装载customer维度    
-- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。    
UPDATE customer_dim     
   SET expiry_date = ${hivevar:pre_date}      
 WHERE customer_dim.customer_sk IN      
(SELECT a.customer_sk     
   FROM (SELECT customer_sk,    
                customer_number,    
                customer_street_address,    
                customer_zip_code,    
                customer_city,    
                customer_state,    
                shipping_address,    
                shipping_zip_code,    
                shipping_city,    
                shipping_state    
           FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN     
                rds.customer b ON a.customer_number = b.customer_number     
          WHERE b.customer_number IS NULL OR     
          (  !(a.customer_street_address <=> b.customer_street_address)    
          OR !(a.customer_zip_code <=> b.customer_zip_code)    
          OR !(a.customer_city <=> b.customer_city)    
          OR !(a.customer_state <=> b.customer_state)    
          OR !(a.shipping_address <=> b.shipping_address)    
          OR !(a.shipping_zip_code <=> b.shipping_zip_code)    
          OR !(a.shipping_city <=> b.shipping_city)    
          OR !(a.shipping_state <=> b.shipping_state)    
          ));     
    
-- 处理customer_street_addresses列上SCD2的新增行      
INSERT INTO customer_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,    
    t1.customer_number,    
    t1.customer_name,    
    t1.customer_street_address,    
    t1.customer_zip_code,    
    t1.customer_city,    
    t1.customer_state,    
    t1.shipping_address,    
    t1.shipping_zip_code,    
    t1.shipping_city,    
    t1.shipping_state,    
    t1.version,    
    t1.effective_date,    
    t1.expiry_date    
FROM      
(      
SELECT      
    t2.customer_number customer_number,    
    t2.customer_name customer_name,    
    t2.customer_street_address customer_street_address,    
    t2.customer_zip_code customer_zip_code,    
    t2.customer_city customer_city,    
    t2.customer_state customer_state,    
    t2.shipping_address shipping_address,    
    t2.shipping_zip_code shipping_zip_code,    
    t2.shipping_city shipping_city,    
    t2.shipping_state shipping_state,    
    t1.version + 1 version,    
    ${hivevar:pre_date} effective_date,      
    ${hivevar:max_date} expiry_date      
 FROM customer_dim t1     
INNER JOIN rds.customer t2      
   ON t1.customer_number = t2.customer_number       
  AND t1.expiry_date = ${hivevar:pre_date}      
 LEFT JOIN customer_dim t3     
   ON t1.customer_number = t3.customer_number     
  AND t3.expiry_date = ${hivevar:max_date}      
WHERE (!(t1.customer_street_address <=> t2.customer_street_address)    
   OR  !(t1.customer_zip_code <=> t2.customer_zip_code)    
   OR  !(t1.customer_city <=> t2.customer_city)    
   OR  !(t1.customer_state <=> t2.customer_state)    
   OR  !(t1.shipping_address <=> t2.shipping_address)    
   OR  !(t1.shipping_zip_code <=> t2.shipping_zip_code)    
   OR  !(t1.shipping_city <=> t2.shipping_city)    
   OR  !(t1.shipping_state <=> t2.shipping_state)    
   )    
  AND t3.customer_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
    
-- 处理customer_name列上的SCD1    
-- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update    
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录    
DROP TABLE IF EXISTS tmp;    
CREATE TABLE tmp AS    
SELECT    
    a.customer_sk,    
    a.customer_number,    
    b.customer_name,    
    a.customer_street_address,    
    a.customer_zip_code,    
    a.customer_city,    
    a.customer_state,    
    a.shipping_address,    
    a.shipping_zip_code,    
    a.shipping_city,    
    a.shipping_state,    
    a.version,    
    a.effective_date,    
    a.expiry_date    
  FROM customer_dim a, rds.customer b      
 WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);      
DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);      
INSERT INTO customer_dim SELECT * FROM tmp;    
    
-- 处理新增的customer记录     
INSERT INTO customer_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,    
    t1.customer_number,    
    t1.customer_name,    
    t1.customer_street_address,    
    t1.customer_zip_code,    
    t1.customer_city,    
    t1.customer_state,    
    t1.shipping_address,    
    t1.shipping_zip_code,    
    t1.shipping_city,    
    t1.shipping_state,    
    1,    
    ${hivevar:pre_date},    
    ${hivevar:max_date}    
FROM      
(      
SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number      
 WHERE t2.customer_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    
    
-- 重载PA客户维度    
TRUNCATE TABLE pa_customer_dim;      
INSERT INTO pa_customer_dim      
SELECT      
  customer_sk      
, customer_number      
, customer_name      
, customer_street_address      
, customer_zip_code      
, customer_city      
, customer_state      
, shipping_address      
, shipping_zip_code      
, shipping_city      
, shipping_state      
, version      
, effective_date      
, expiry_date      
FROM customer_dim      
WHERE customer_state = 'PA' ;     
    
-- 装载product维度    
-- 设置已删除记录和product_name、product_category列上SCD2的过期    
UPDATE product_dim    
   SET expiry_date = ${hivevar:pre_date}      
 WHERE product_dim.product_sk IN      
(SELECT a.product_sk     
   FROM (SELECT product_sk,product_code,product_name,product_category     
           FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN     
                rds.product b ON a.product_code = b.product_code     
          WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));    
    
-- 处理product_name、product_category列上SCD2的新增行      
INSERT INTO product_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,    
    t1.product_code,    
    t1.product_name,    
    t1.product_category,    
    t1.version,    
    t1.effective_date,    
    t1.expiry_date    
FROM      
(      
SELECT      
    t2.product_code product_code,    
    t2.product_name product_name,    
    t2.product_category product_category,        
    t1.version + 1 version,    
    ${hivevar:pre_date} effective_date,      
    ${hivevar:max_date} expiry_date      
 FROM product_dim t1     
INNER JOIN rds.product t2      
   ON t1.product_code = t2.product_code      
  AND t1.expiry_date = ${hivevar:pre_date}      
 LEFT JOIN product_dim t3     
   ON t1.product_code = t3.product_code     
  AND t3.expiry_date = ${hivevar:max_date}      
WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    
    
-- 处理新增的product记录    
INSERT INTO product_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,    
    t1.product_code,    
    t1.product_name,    
    t1.product_category,    
    1,    
    ${hivevar:pre_date},    
    ${hivevar:max_date}    
FROM      
(      
SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code      
 WHERE t2.product_sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    
    
-- 装载销售订单事实表 
-- 前一天新增的销售订单   
INSERT INTO sales_order_fact    
SELECT    
    a.order_number,    
    customer_sk,    
    product_sk, 
    g.sales_order_attribute_sk,
    e.order_date_sk,
    null,
    null,
    null,
    null,
    null,
    null,
    null,
    null,
    f.request_delivery_date_sk,
    order_amount,    
    quantity    
  FROM    
    rds.sales_order a,     
    customer_dim c,    
    product_dim d,    
    order_date_dim e,  
    request_delivery_date_dim f, 
    sales_order_attribute_dim g,
    rds.cdc_time h
 WHERE 
    a.order_status = 'N'
AND a.customer_number = c.customer_number    
AND a.status_date >= c.effective_date    
AND a.status_date < c.expiry_date    
AND a.product_code = d.product_code    
AND a.status_date >= d.effective_date    
AND a.status_date < d.expiry_date    
AND to_date(a.status_date) = e.order_date   
AND to_date(a.request_delivery_date) = f.request_delivery_date
AND a.verification_ind = g.verification_ind  
AND a.credit_check_flag = g.credit_check_flag  
AND a.new_customer_ind = g.new_customer_ind  
AND a.web_order_flag = g.web_order_flag 
AND a.entry_date >= h.last_load AND a.entry_date < h.current_load ;    

-- 处理分配库房、打包、配送和收货四个状态
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t2.allocate_date_sk allocate_date_sk,
       t1.quantity allocate_quantity,
       t0.packing_date_sk packing_date_sk,
       t0.packing_quantity packing_quantity,
       t0.ship_date_sk ship_date_sk,
       t0.ship_quantity ship_quantity,
       t0.receive_date_sk receive_date_sk,
       t0.receive_quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       allocate_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'A' 
   and to_date(t1.status_date) = t2.allocate_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;

DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t0.allocate_date_sk allocate_date_sk,
       t0.allocate_quantity allocate_quantity,
       t2.packing_date_sk packing_date_sk,
       t1.quantity packing_quantity,
       t0.ship_date_sk ship_date_sk,
       t0.ship_quantity ship_quantity,
       t0.receive_date_sk receive_date_sk,
       t0.receive_quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       packing_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'P' 
   and to_date(t1.status_date) = t2.packing_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load; 
   
DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t0.allocate_date_sk allocate_date_sk,
       t0.allocate_quantity allocate_quantity,
       t0.packing_date_sk packing_date_sk,
       t0.packing_quantity packing_quantity,
       t2.ship_date_sk ship_date_sk,
       t1.quantity ship_quantity,
       t0.receive_date_sk receive_date_sk,
       t0.receive_quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       ship_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'S' 
   and to_date(t1.status_date) = t2.ship_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
   
DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
select t0.order_number order_number,
       t0.customer_sk customer_sk,
       t0.product_sk product_sk,
       t0.sales_order_attribute_sk,
       t0.order_date_sk order_date_sk,
       t0.allocate_date_sk allocate_date_sk,
       t0.allocate_quantity allocate_quantity,
       t0.packing_date_sk packing_date_sk,
       t0.packing_quantity packing_quantity,
       t0.ship_date_sk ship_date_sk,
       t0.ship_quantity ship_quantity,
       t2.receive_date_sk receive_date_sk,
       t1.quantity receive_quantity,
       t0.request_delivery_date_sk request_delivery_date_sk,
       t0.order_amount order_amount,
       t0.order_quantity order_quantity
  from sales_order_fact t0,
       rds.sales_order t1,
       receive_date_dim t2,
       rds.cdc_time t4
 where t0.order_number = t1.order_number and t1.order_status = 'R' 
   and to_date(t1.status_date) = t2.receive_date
   and t1.entry_date >= t4.last_load and t1.entry_date < t4.current_load;
   
DELETE FROM sales_order_fact WHERE sales_order_fact.order_number IN (SELECT order_number FROM tmp); 
INSERT INTO sales_order_fact SELECT * FROM tmp;

-- 更新时间戳表的last_load字段    
INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;

4. 测试修改后的定期装载 (1)使用下面的脚本添加八个销售订单。

代码语言:javascript
复制
USE source;  
DROP TABLE IF EXISTS temp_sales_order_data;    
CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0; 

SET @start_date := unix_timestamp('2016-07-31');    
SET @end_date := unix_timestamp('2016-08-01');    

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);   
SET @quantity := floor(10 + rand() * 90);      
INSERT INTO temp_sales_order_data VALUES (1, 133, 1, 1, 'Y', 'Y', 'N', 'Y', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);  

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);   
SET @quantity := floor(10 + rand() * 90);      
INSERT INTO temp_sales_order_data VALUES (2, 134, 2, 2, 'N', 'N', 'N', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);  

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);   
SET @quantity := floor(10 + rand() * 90);      
INSERT INTO temp_sales_order_data VALUES (3, 135, 3, 3, 'Y', 'Y', 'N', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity);  

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);   
SET @quantity := floor(10 + rand() * 90);      
INSERT INTO temp_sales_order_data VALUES (4, 136, 4, 4, 'Y', 'N', 'N', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity); 

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);   
SET @quantity := floor(10 + rand() * 90);      
INSERT INTO temp_sales_order_data VALUES (5, 137, 11, 1, 'N', 'Y', 'Y', 'Y', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity); 

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);   
SET @quantity := floor(10 + rand() * 90);      
INSERT INTO temp_sales_order_data VALUES (6, 138, 12, 2, 'N', 'Y', 'Y', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity); 

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);   
SET @quantity := floor(10 + rand() * 90);      
INSERT INTO temp_sales_order_data VALUES (7, 139, 13, 3, 'Y', 'Y', 'Y', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity); 

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    
SET @amount := floor(1000 + rand() * 9000);   
SET @quantity := floor(10 + rand() * 90);      
INSERT INTO temp_sales_order_data VALUES (8, 140, 14, 4, 'Y', 'N', 'Y', 'N', @order_date, 'N', '2016-08-05', @order_date, @amount, @quantity); 

INSERT INTO sales_order          
select null,    
       @rn:=@rn+1,    
       customer_number,    
       product_code,
       verification_ind,
       credit_check_flag,
       new_customer_ind,
       web_order_flag,  
       status_date,    
       order_status,    
       request_delivery_date,    
       entry_date,    
       order_amount,    
       quantity    
  from temp_sales_order_data t1 ,(select @rn:=132) t2
 order by t1.status_date;  
  
COMMIT;

(2)执行定期装载

代码语言:javascript
复制
./regular_etl.sh

(3)验证结果 可以使用下面的分析性查询确认装载正确。该查询分析出检查了信用状态的新用户有所占的比例。

代码语言:javascript
复制
USE dw;  
SELECT CONCAT(ROUND(checked / (checked + not_checked) * 100),' % ')  
  FROM (SELECT sum(case when credit_check_flag='Y' then 1 else 0 end) checked,
               sum(case when credit_check_flag='N' then 1 else 0 end) not_checked
          FROM sales_order_fact a, sales_order_attribute_dim b  
         WHERE new_customer_ind = 'Y'              
           AND a.sales_order_attribute_sk = b.sales_order_attribute_sk) t;

查询结果下图所示。

下一篇
举报
领券