首页
学习
活动
专区
圈层
工具
发布
25 篇文章
1
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)
2
基于Hadoop生态圈的数据仓库实践 —— ETL(二)
3
基于Hadoop生态圈的数据仓库实践 —— ETL(一)
4
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(三)
5
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(二)
6
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(一)
7
基于Hadoop生态圈的数据仓库实践 —— 概述(二)
8
基于Hadoop生态圈的数据仓库实践 —— 概述(一)
9
基于Hadoop生态圈的数据仓库实践 —— 进阶技术
10
基于Hadoop生态圈的数据仓库实践 —— 进阶技术
11
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(二)
12
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(一)
13
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(六)
14
基于Hadoop生态圈的数据仓库实践 —— ETL(三)
15
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十三)
16
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十二)
17
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十一)
18
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十七)
19
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十六)
20
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十五)
21
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十)
22
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十四)
23
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(九)
24
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(八)
25
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(七)

基于Hadoop生态圈的数据仓库实践 —— ETL(二)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wzy0623/article/details/51837457

二、使用Hive转换、装载数据 1. Hive简介 (1)Hive是什么 Hive是一个数据仓库软件,使用SQL读、写、管理分布式存储上的大数据集。它建立在Hadoop之上,具有以下功能和特点:

  • 通过SQL方便地访问数据,适合执行ETL、报表、数据分析等数据仓库任务。
  • 提供一种机制,给各种各样的数据格式加上结构。
  • 直接访问HDFS的文件,或者访问如HBase的其它数据存储。
  • 可以通过MapReduce、Spark或Tez等多种计算框架执行查询。

Hive提供标准的SQL功能,包括2003以后的标准和2011标准中的分析特性。Hive中的SQL还可以通过用户定义的函数(UDFs)、用户定义的聚合函数(UDAFs)、用户定义的表函数(UDTFs)进行扩展。Hive内建连接器支持CSV文本文件、Parquet、ORC等多种数据格式,用户也可以扩展支持其它格式的连接器。Hive被设计成一个可扩展的、高性能的、容错的、与输入数据格式松耦合的系统,适合于数据仓库中的汇总、分析、即时查询等任务,而不适合联机事务处理的工作场景。Hive有HCatalog和WebHCat两个组件。

  • HCatalog是Hadoop的表和存储管理层,允许使用Pig和MapReduce等数据处理工具的用户更容易读写集群中的数据。
  • WebHCat提供了一个服务,可以使用HTTP接口执行MapReduce(或YARN)、Pig、Hive作业或元数据操作。

(2)Hive的体系结构 Hive的体系结构如下图所示。

Hive建立在Hadoop的分布式文件系统(HDFS)和MapReduce系统之上。图中显示了Hadoop 1和Hadoop 2中的两种MapReduce组件。在Hadoop 1中,Hive查询被转化成MapReduce代码,并且使用第一版的MapReduce框架执行,如JobTracker和TaskTracker。在Hadoop 2中,YARN将资源管理和调度从MapReduce框架中解耦。Hive查询仍然被转化为MapReduce代码并执行,但使用的是YARN框架和第二版的MapReduce。 为了更好地理解Hive如何与Hadoop的基本组件一起协同工作,可以把Hadoop看做一个操作系统,HDFS和MapReduce是这个操作系统的组成部分,而象Hive、HBase这些组件,则是操作系统的上层应用或功能。Hadoop生态圈的通用底层架构是,HDFS提供分布式存储,MapReduce为上层功能提供并行处理能力。 在HDFS和MapReduce之上,图中显示了Hive驱动程序和元数据存储。Hive驱动程序(及其编译器)负责编译、优化和执行HiveQL。依赖于具体情况,Hive驱动程序可能选择在本地执行Hive语句或命令,也可能是产生一个MapReduce作业。Hive驱动程序把元数据存储在数据库中。 缺省配置下,Hive在内建的Derby关系数据库系统中存储元数据,这种方式被称为嵌入模式。在这种模式下,Hive驱动程序、元数据存储和Derby全部运行在同一个Java虚拟机中(JVM)。 这种配置适合于学习的目的,它只支持单一Hive会话,所以不能用于多用户的生产环境。还有两种模式——本地和远程——可以更好地支持Hive的多会话生产环境。而且,可以配置任何与JDBC API兼容的关系数据库系统用作存储元数据(例如MySQL、Oracle等)。 对应用支持的关键组件是Hive Thrift服务,它允许一个富客户端集访问Hive,开源的SQuirreL SQL客户端被作为示例包含其中。任何与JDBC兼容的应用,都可以通过绑定的的JDBC驱动访问Hive。与ODBC兼容的客户端,如Linux下典型的unixODBC和isql应用程序,可以从远程Linux客户端访问Hive。如果在客户端安装了相应的ODBC驱动,甚至可以从微软的Excel访问Hive。通过Thrift还可以用Java以外的程序语言(如PHP或Python)访问Hive。就像JDBC、ODBC一样,Thrift客户端通过Thrift服务器访问Hive。 架构图的最上面包括一个命令行接口(CLI),可以在Linux终端窗口向Hive驱动程序直接发出查询或管理命令。还有一个简单的Web界面,通过它可以从浏览器访问Hive管理表及其数据。 (3)Hive的工作流程 Hive的工作流(第一版的MapReduce)如下图所示。

下表说明Hive如何与Hadoop框架进行交互。

步骤号

操作

1

执行查询 从Hive的CLI或Web UI发查询命令给驱动程序(任何JDBC、ODBC数据库驱动)执行。

2

获得计划 驱动程序请求查询编译器解析查询、检查语法、生成查询计划或者查询所需要的资源。

3

获取元数据 编译器向元数据存储(数据库)发送元数据请求。

4

发送元数据 作为响应,元数据存储发向编译器发送元数据。

5

发送计划 编译器检查需要的资源,并把查询计划发送给驱动程序。至此,查询解析完成。

6

执行计划 驱动程序向执行引擎发送执行计划。

7

执行作业 在内部,执行计划的处理是一个MapReduce作业。执行引擎向Name node上的JobTracker进程发送作业,JobTracker把作业分配给Data node上的TaskTracker进程。此时,查询执行MapReduce作业。

7.1

操作元数据 执行作业的同时,执行引擎可能会执行元数据操作(DDL等)。

8

取回结果 执行引擎从Data node接收结果。

9

发送结果 执行引擎向驱动程序发送合成的结果值。

10

发送结果 驱动程序向Hive接口(CLI或Web UI)发送结果。

(4)Hive的事务支持 在前面搭建示例环境时曾提到,Hive从0.14版本开始支持事务和行级更新。但是到目前为止,Hive对事务的支持有很多限制。

  • 不支持BEGIN、COMMIT和ROLLBACK,所有操作都是自动提交。
  • 只支持ORC文件格式。
  • 缺省配置不支持事务。
  • 表必须分桶(bucket)。
  • 只支持快照级别的隔离,提供查询开始时的数据快照,不支持脏读、读提交、可重复读和串行化等隔离级别。
  • ZooKeeper和内存锁管理器与事务不兼容。

2. 初始装载 在数据仓库可以使用前,需要装载历史数据。这些历史数据是导入进数据仓库的第一个数据集合。首次装载被称为初始装载,一般是一次性工作。由最终用户来决定有多少历史数据进入数据仓库。例如,数据仓库使用的开始时间是2015年3月1日,而用户希望装载两年的历史数据,那么应该初始装载2013年3月1日到2015年2月28日之间的源数据。在2015年3月2日装载2015年3月1日的数据,之后周期性地每天装载前一天的数据。在装载事实表前,必须先装载所有的维度表。因为事实表需要维度的代理键。这不仅针对初始装载,也针对定期装载。本节说明执行初始装载的步骤,包括标识源数据、维度历史的处理、使用HiveQL开发和验证初始装载过程。 设计开发初始装载步骤前需要识别数据仓库的每个事实表和每个维度表用到的并且是可用的源数据,并了解数据源的特性,例如文件类型、记录结构和可访问性等。下表显示的是本示例中销售订单数据仓库需要的源数据的关键信息,包括源数据表、对应的数据仓库目标表等属性。这类表格通常称作数据源对应图,因为它反应了每个从源数据到目标数据的对应关系。生成这个表格的过程叫做数据源映射。在本示例中,客户和产品的源数据直接与其数据仓库里的目标表,customer_dim和product_dim表相对应。另一方面,销售订单事务表是多个数据仓库表的源。

源数据

源数据类型

文件名/表名

数据仓库中的目标表

客户

MySQL表

customer

customer_dim

产品

MySQL表

product

product_dim

销售订单

MySQL表

sales_order

order_dim

sales_order_fact

date_dim(如果使用“从源数据装载日期”方法,本示例中使用的预装载)

标识出了数据源,现在要考虑维度历史的处理。大多数维度值是随着时间改变的。客户改变了姓名,产品的名称或分类变化等。当一个维度改变,比如一个产品有了新的分类,必须要维护维度的历史。在这种情况下,product_dim表里必须既存储产品老的分类,也存储产品当前的分类。并且,老的销售订单里的产品分类信息引用老的分类。渐变维(SCD)即是一种在多维数据仓库中实现维度历史的技术。有三种不同的SCD技术:SCD 类型1(SCD1),SCD类型2(SCD2),SCD类型3(SCD3):

  • SCD1通过修改维度记录直接覆盖已存在的值,它不维护记录的历史。SCD1一般用于修改错误的数据。
  • SCD2在源数据发生变化时,给维度记录建立一个新的“版本”,从而维护维度历史。SCD2不删除、修改已存在的数据。
  • SCD3保持维度记录的一个版本。它通过给某个数据单元增加多个列来维护历史。例如,为了维护客户地址,customer_dim维度表有一个customer_address列和一个previous_customer_address列。SCD3可以有效维护有限的历史,而不像SCD2那样维护全部历史。SCD3很少使用。它只适用于数据库空间不足并且用户接受有限维度历史的情况。

在本示例中,客户维度历史的客户名称使用SCD1,客户地址使用SCD2,产品维度历史的产品名称和产品类型属性使用SCD2。 现在可以编写用于初始装载的脚本了。假设数据仓库从2016年7月4日开始使用,用户希望装载所有的历史数据。下面的init_etl.sh脚本用于完成初始装载过程。

代码语言:javascript
复制
#!/bin/bash
# 建立Sqoop增量导入作业,以order_number作为检查列,初始的last-value是0
sqoop job --delete myjob_incremental_import
sqoop job --create myjob_incremental_import \
-- \
import \
--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \
--table sales_order \
--columns "order_number, customer_number, product_code, order_date, entry_date, order_amount" \
--hive-import \
--hive-table rds.sales_order \
--incremental append \
--check-column order_number \
--last-value 0
# 首次抽取,将全部数据导入RDS库
sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table customer --hive-import --hive-table rds.customer --hive-overwrite
sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table product --hive-import --hive-table rds.product --hive-overwrite
beeline -u jdbc:hive2://cdh2:10000/dw -e "TRUNCATE TABLE rds.sales_order"
# 执行增量导入,因为last-value初始值为0,所以此次会导入全部数据
sqoop job --exec myjob_incremental_import
# 调用init_etl.sql文件执行初始装载
beeline -u jdbc:hive2://cdh2:10000/dw -f init_etl.sql

init_etl.sql文件中的HiveQL脚本如下:

代码语言:javascript
复制
USE dw;
-- 清空表
TRUNCATE TABLE customer_dim;
TRUNCATE TABLE product_dim;
TRUNCATE TABLE order_dim;
TRUNCATE TABLE sales_order_fact;
-- 装载客户维度表
INSERT INTO customer_dim
SELECT
  ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max
, t1.customer_number
, t1.customer_name
, t1.customer_street_address
, t1.customer_zip_code
, t1.customer_city
, t1.customer_state
, 1
, '2016-03-01'
, '2200-01-01'
FROM
rds.customer t1 CROSS JOIN (SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2; 
-- 装载产品维度表
INSERT INTO product_dim
SELECT
  ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max
, product_code
, product_name
, product_category
, 1
, '2016-03-01'
, '2200-01-01'
FROM
rds.product t1 CROSS JOIN (SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;
-- 装载订单维度表
INSERT INTO order_dim  
SELECT  
ROW_NUMBER() OVER (ORDER BY t1.order_number) + t2.sk_max
, order_number  
, 1  
, order_date 
, '2200-01-01'  
FROM rds.sales_order t1 CROSS JOIN (SELECT COALESCE(MAX(order_sk),0) sk_max FROM order_dim) t2;  
-- 装载销售订单事实表
INSERT INTO sales_order_fact
SELECT
  order_sk
, customer_sk
, product_sk
, date_sk
, order_amount
FROM
  rds.sales_order a
, order_dim b
, customer_dim c
, product_dim d
, date_dim e
WHERE
    a.order_number = b.order_number
AND a.customer_number = c.customer_number
AND a.product_code = d.product_code
AND to_date(a.order_date) = e.date;

说明:

  • 时间粒度为每天,也就是说,一天内的发生的数据变化将被忽略,以一天内最后的数据版本为准。
  • 使用了窗口函数ROW_NUMBER()实现生成代理键。
  • 客户和产品维度的生效日期是2016年3月1日。装载的销售订单不会早于该日期,也就是说,不需要更早的客户和产品维度数据。
  • 订单维度的生效日期显然就是订单生成的日期(order_date字段)。为了使所有维度表具有相同的粒度,订单维度的生效日期字段只保留到日期,忽略时间。
  • 销售订单事实表的外键列引用维度表的代理键。
  • date_dim维度表的数据已经预生成,日期从2000年1月1日到2020年12月31日。

使用下面的命令执行初始装载。

代码语言:javascript
复制
./init_etl.sh

使用下面的查询验证初始装载的正确性。

代码语言:javascript
复制
USE dw;

SELECT
  order_number
, customer_name
, product_name
, date
, order_amount amount
FROM
  sales_order_fact a
, customer_dim b
, product_dim c
, order_dim d
, date_dim e
WHERE
    a.customer_sk = b.customer_sk
AND a.product_sk = c.product_sk
AND a.order_sk = d.order_sk
AND a.order_date_sk = e.date_sk
ORDER BY order_number;

此查询应该返回100条数据,如下图所示。

3. 定期装载 初始装载只在开始数据仓库使用前执行一次,然而,必须要按时调度定期执行装载源数据的过程。本节说明执行定期装载的步骤,包括识别源数据与装载类型、使用HiveQL开发和测试定期装载过程。 定期装载首先要识别数据仓库的每个事实表和每个维度表用到的并且是可用的源数据。然后要决定适合装载的抽取模式和维度历史装载类型。下表汇总了本示例的这些信息。

源数据

RDS

数据仓库

抽取模式

维度历史装载类型

customer

customer

customer_dim

整体、拉取

address列上SCD2 name列上SCD1

product

product

product_dim

整体、拉取

SCD2

sales_order

sales_order

order_dim

CDC(每天)、拉取

唯一订单号

sales_order_fact

CDC(每天)、拉取

n/a

n/a

n/a

date_dim

n/a

预装载

本示例中order_dim维度表和sales_order_fact使用基于时间戳的CDC抽取模式。为此在RDS库中建立一个名为cdc_time的时间戳表,这个表里有两个字段,一个是last_load,一个是current_load。之所以需要两个字段,是因为在装载过程中,可能会有新的数据被插入或更新,为了避免脏读和死锁的情况,最好给时间戳设定一个上限条件,即current_load字段。本示例的时间粒度为每天,所以时间戳只要保留日期部分即可。这两个字段的初始值是“初始加载”执行的日期,本示例中为'2016-07-04'。当开始装载时,current_load设置为当前日期。在开始定期装载实验前,先使用下面的脚本建立时间戳表。

代码语言:javascript
复制
USE rds;

DROP TABLE IF EXISTS cdc_time ;
CREATE TABLE cdc_time
(  
 last_load date,
 current_load date
);

SET hivevar:last_load = DATE_ADD(CURRENT_DATE(),-1); 
INSERT OVERWRITE TABLE cdc_time SELECT ${hivevar:last_load}, ${hivevar:last_load} ;

使用下面的regular_etl.sh脚本完成定期装载过程。

代码语言:javascript
复制
#!/bin/bash
# 整体拉取customer、product表数据
sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table customer --hive-import --hive-table rds.customer --hive-overwrite
sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table product --hive-import --hive-table rds.product --hive-overwrite
# 执行增量导入
sqoop job --exec myjob_incremental_import
# 调用 regular_etl.sql 文件执行定期装载
beeline -u jdbc:hive2://cdh2:10000/dw -f regular_etl.sql

regular_etl.sql文件中的HiveQL脚本如下:

代码语言:javascript
复制
-- 设置变量以支持事务
set hive.support.concurrency=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on=true;
set hive.compactor.worker.threads=1;

USE dw;
  
-- 设置SCD的生效时间和过期时间
SET hivevar:cur_date = CURRENT_DATE();
SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);
SET hivevar:max_date = CAST('2200-01-01' AS DATE);
  
-- 设置CDC的上限时间
INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;

-- 装载customer维度
-- 设置已删除记录和customer_street_addresses列上SCD2的过期  
UPDATE customer_dim 
   SET expiry_date = ${hivevar:pre_date}  
 WHERE customer_dim.customer_sk IN  
(SELECT a.customer_sk 
   FROM (SELECT customer_sk,customer_number,customer_street_address 
           FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN 
                rds.customer b ON a.customer_number = b.customer_number 
          WHERE b.customer_number IS NULL OR a.customer_street_address <> b.customer_street_address); 

-- 处理customer_street_addresses列上SCD2的新增行  
INSERT INTO customer_dim
SELECT
    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
    t1.customer_number,
    t1.customer_name,
    t1.customer_street_address,
    t1.customer_zip_code,
    t1.customer_city,
    t1.customer_state,
    t1.version,  
    t1.effective_date,  
    t1.expiry_date  
FROM  
(  
SELECT  
    t2.customer_number customer_number,
    t2.customer_name customer_name,
    t2.customer_street_address customer_street_address,
    t2.customer_zip_code,
    t2.customer_city,
    t2.customer_state,
    t1.version + 1 version,
    ${hivevar:pre_date} effective_date,  
    ${hivevar:max_date} expiry_date  
 FROM customer_dim t1 
INNER JOIN rds.customer t2  
   ON t1.customer_number = t2.customer_number   
  AND t1.expiry_date = ${hivevar:pre_date}  
 LEFT JOIN customer_dim t3 
   ON t1.customer_number = t3.customer_number 
  AND t3.expiry_date = ${hivevar:max_date}  
WHERE t1.customer_street_address <> t2.customer_street_address AND t3.customer_sk IS NULL) t1  
CROSS JOIN  
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;

-- 处理customer_name列上的SCD1
-- 因为hive里update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update,为简单起见也不考虑并发问题(数据仓库应用的并发操作基本都是只读的,很少并发写,所以并发导致的问题并不像OLTP那样严重)。
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT
    a.customer_sk,
    a.customer_number,
    b.customer_name,
    a.customer_street_address,
    a.customer_zip_code,
    a.customer_city,
    a.customer_state,
    a.version,
    a.effective_date,
    a.expiry_date
  FROM customer_dim a, rds.customer b  
 WHERE a.customer_number = b.customer_number AND (a.customer_name <> b.customer_name);  
DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);  
INSERT INTO customer_dim SELECT * FROM tmp;

-- 处理新增的customer记录 
INSERT INTO customer_dim
SELECT
    ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
    t1.customer_number,
    t1.customer_name,
    t1.customer_street_address,
    t1.customer_zip_code,
    t1.customer_city,
    t1.customer_state, 
    1,
    ${hivevar:pre_date},
    ${hivevar:max_date}
FROM  
(  
SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number  
 WHERE t2.customer_sk IS NULL) t1  
CROSS JOIN  
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;

-- 装载product维度
-- 设置已删除记录和product_name、product_category列上SCD2的过期
UPDATE product_dim
   SET expiry_date = ${hivevar:pre_date}  
 WHERE product_dim.product_sk IN  
(SELECT a.product_sk 
   FROM (SELECT product_sk,product_code,product_name,product_category 
           FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN 
                rds.product b ON a.product_code = b.product_code 
          WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));

-- 处理product_name、product_category列上SCD2的新增行  
INSERT INTO product_dim
SELECT
    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
    t1.product_code,
    t1.product_name,
    t1.product_category,
    t1.version,
    t1.effective_date,
    t1.expiry_date
FROM  
(  
SELECT  
    t2.product_code product_code,
    t2.product_name product_name,
    t2.product_category product_category,    
    t1.version + 1 version,
    ${hivevar:pre_date} effective_date,  
    ${hivevar:max_date} expiry_date  
 FROM product_dim t1 
INNER JOIN rds.product t2  
   ON t1.product_code = t2.product_code  
  AND t1.expiry_date = ${hivevar:pre_date}  
 LEFT JOIN product_dim t3 
   ON t1.product_code = t3.product_code 
  AND t3.expiry_date = ${hivevar:max_date}  
WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1  
CROSS JOIN  
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;

-- 处理新增的product记录
INSERT INTO product_dim
SELECT
    ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
    t1.product_code,
    t1.product_name,
    t1.product_category,
    1,
    ${hivevar:pre_date},
    ${hivevar:max_date}
FROM  
(  
SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code  
 WHERE t2.product_sk IS NULL) t1  
CROSS JOIN  
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;

-- 装载order维度
INSERT INTO order_dim
SELECT
    ROW_NUMBER() OVER (ORDER BY t1.order_number) + t2.sk_max,
    t1.order_number,
    t1.version,
    t1.effective_date,
    t1.expiry_date
  FROM
(
SELECT
    order_number order_number,
    1 version,
    order_date effective_date,
    '2200-01-01' expiry_date
  FROM rds.sales_order, rds.cdc_time 
 WHERE entry_date >= last_load AND entry_date < current_load ) t1
CROSS JOIN  
(SELECT COALESCE(MAX(order_sk),0) sk_max FROM order_dim) t2;

-- 装载销售订单事实表
INSERT INTO sales_order_fact
SELECT
    order_sk,
    customer_sk,
    product_sk,
    date_sk,
    order_amount
  FROM
    rds.sales_order a,
    order_dim b,
    customer_dim c,
    product_dim d,
    date_dim e,
    rds.cdc_time f
 WHERE
    a.order_number = b.order_number
AND a.customer_number = c.customer_number
AND a.order_date >= c.effective_date
AND a.order_date < c.expiry_date
AND a.product_code = d.product_code
AND a.order_date >= d.effective_date
AND a.order_date < d.expiry_date
AND to_date(a.order_date) = e.date
AND a.entry_date >= f.last_load AND a.entry_date < f.current_load ;

-- 更新时间戳表的last_load字段
INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;

说明:

  • customer和product表分别通过rds.customer和rds.product表导入customer_dim和product_dim表。
  • 客户地址、产品名称和产品分类使用SCD2,客户姓名使用SCD1。
  • 上次执行定期装载的日期到当前日期之间的销售订单被装载到rds.order_dim和sales_order_fact表。

测试步骤: (1)执行下面的SQL脚本准备源数据库中的客户、产品和销售订单测试数据。

代码语言:javascript
复制
USE source;

/***   
客户数据的改变如下:
客户6的街道号改为7777 Ritter Rd。(原来是7070 Ritter Rd)
客户7的姓名改为Distinguished Agencies。(原来是Distinguished Partners)
新增第八个客户。
***/
UPDATE customer SET customer_street_address = '7777 Ritter Rd.' WHERE customer_number = 6 ;
UPDATE customer SET customer_name = 'Distinguished Agencies' WHERE customer_number = 7 ;
INSERT INTO customer
(customer_name, customer_street_address, customer_zip_code, customer_city, customer_state)
VALUES
('Subsidiaries', '10000 Wetline Blvd.', 17055, 'Pittsburgh', 'PA') ;

/***
产品数据的改变如下:
产品3的名称改为Flat Panel。(原来是LCD Panel)
新增第四个产品。
***/
UPDATE product SET product_name = 'Flat Panel' WHERE product_code = 3 ;
INSERT INTO product
(product_name, product_category)
VALUES
('Keyboard', 'Peripheral') ; 
  
/***  
新增订单日期为2016年7月4日的16条订单。  
***/
SET @start_date := unix_timestamp('2016-07-04');
SET @end_date := unix_timestamp('2016-07-05');
DROP TABLE IF EXISTS temp_sales_order_data;
CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0; 

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (101, 1, 1, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (102, 2, 2, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (103, 3, 3, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (104, 4, 4, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (105, 5, 2, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (106, 6, 2, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (107, 7, 3, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (108, 8, 4, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (109, 1, 1, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (110, 2, 2, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (111, 3, 3, @order_date, @order_date, @amount);
  
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (112, 4, 4, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (113, 5, 1, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (114, 6, 2, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (115, 7, 3, @order_date, @order_date, @amount);

SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (116, 8, 4, @order_date, @order_date, @amount);

INSERT INTO sales_order
SELECT NULL,customer_number,product_code,order_date,entry_date,order_amount FROM temp_sales_order_data ORDER BY order_date;  

COMMIT ; 

新增的16条销售订单如下图所示。

(2)执行regular_etl.sh脚本进行定期装载。

代码语言:javascript
复制
./regular_etl.sh

(3)使用下面的查询验证结果。

代码语言:javascript
复制
use dw;
select * from customer_dim;

客户6的地址变更使用了SCD2,客户7的姓名变更使用了SCD1,新增了客户8。注意客户6第一个版本的到期日期和第二个版本的生效日期同为'2016-07-04',这是因为任何一个SCD的有效期是一个“左闭右开”的区间,以客户6为例,其第一个版本的有效期大于等于'2016-03-01',小于'2016-07-04',即为'2016-03-01'到'2016-07-03'。如下图所示。

代码语言:javascript
复制
select * from product_dim;

产品3的名称变更使用了SCD2,新增了产品4。如下图所示。

代码语言:javascript
复制
select * from order_dim;

现在有116个订单,100个是“初始导入”装载的,16个是本次定期装载的。如下图所示。

代码语言:javascript
复制
select * from sales_order_fact;

2017年7月4日的16个销售订单被添加,产品3的代理键是4而不是3,客户6的代理键是8而不是6。如下图所示。

代码语言:javascript
复制
select * from rds.cdc_time;

时间戳表的最后装载日期已经更新。如下图所示。

以上示例说明了如何用Sqoop和HiveQL实现初始装载和定期装载。需要指出的一点是,就本示例的环境和数据量而言装载执行速度很慢,需要二十多分钟,比关系数据库慢多了。但Hive本身就只适合大数据量的批处理任务,再加上Hive的性能问题一直就被诟病,也就不必再吐槽了。

下一篇
举报
领券