首页
学习
活动
专区
圈层
工具
发布
25 篇文章
1
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)
2
基于Hadoop生态圈的数据仓库实践 —— ETL(二)
3
基于Hadoop生态圈的数据仓库实践 —— ETL(一)
4
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(三)
5
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(二)
6
基于Hadoop生态圈的数据仓库实践 —— 环境搭建(一)
7
基于Hadoop生态圈的数据仓库实践 —— 概述(二)
8
基于Hadoop生态圈的数据仓库实践 —— 概述(一)
9
基于Hadoop生态圈的数据仓库实践 —— 进阶技术
10
基于Hadoop生态圈的数据仓库实践 —— 进阶技术
11
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(二)
12
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(一)
13
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(六)
14
基于Hadoop生态圈的数据仓库实践 —— ETL(三)
15
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十三)
16
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十二)
17
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十一)
18
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十七)
19
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十六)
20
基于hadoop生态圈的数据仓库实践 —— 进阶技术(十五)
21
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十)
22
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十四)
23
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(九)
24
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(八)
25
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(七)

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(七)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://cloud.tencent.com/developer/article/1433115

七、递归

代码语言:txt
复制
    数据仓库中的关联实体经常表现为一种“父—子”关系。在这种类型的关系中,一个父亲可能有多个孩子,而一个孩子只能属于一个父亲。例如,一个人只能被分配到一个部门,而一个部门可能被分配许多人。“父—子”之间是一种递归型树结构,是一种最理想、最灵活的存储层次树的数据结构。本节说明一些递归处理的问题,包括数据装载、树的展开、递归查询、树的平面化等技术实现。为了保持销售订单示例的完整性,本节的实验将会使用另一个与业务无关的通用示例。
  1. 建立表并添加实验数据
代码语言:javascript
复制
-- 在MySQL的source库中建立源表
use source;  
create table tree  
(  
  c_child   int,  
  c_name    varchar(100),  
  c_parent  int  
);  
  
create index idx1 on tree (c_parent);  
create unique index tree_pk on tree (c_child);  

-- 递归树结构,c_child是主键,c_parent是引用c_child的外键
alter table tree add (constraint tree_pk primary key (c_child));  
alter table tree add (constraint tree_r01 foreign key (c_parent) references tree (c_child));  

-- 添加数据
insert into tree (c_child, c_name, c_parent) values (1, '节点1', null);  
insert into tree (c_child, c_name, c_parent) values (2, '节点2', 1);  
insert into tree (c_child, c_name, c_parent) values (3, '节点3', 1);  
insert into tree (c_child, c_name, c_parent) values (4, '节点4', 1);  
insert into tree (c_child, c_name, c_parent) values (5, '节点5', 2);  
insert into tree (c_child, c_name, c_parent) values (6, '节点6', 2);  
insert into tree (c_child, c_name, c_parent) values (7, '节点7', 3);  
insert into tree (c_child, c_name, c_parent) values (8, '节点8', 3);  
insert into tree (c_child, c_name, c_parent) values (9, '节点9', 3);  
insert into tree (c_child, c_name, c_parent) values (10, '节点10', 4);  
insert into tree (c_child, c_name, c_parent) values (11, '节点11', 4);  
commit;  

-- 在Hive的rds库中建立过渡表
use rds;
create table tree  
(  
  c_child   int,  
  c_name    varchar(100),  
  c_parent  int  
);

-- 在Hive的dw库中建立相关维度表
use dw;  
create table tree_dim  
(  
  sk             int,  
  c_child        int,  
  c_name         varchar(100),  
  c_parent       int,
  version        int,  
  effective_date date,  
  expiry_date    date  
)
CLUSTERED BY (sk) INTO 8 BUCKETS    
STORED AS ORC TBLPROPERTIES ('transactional'='true');

2. 数据装载

代码语言:txt
复制
    递归树结构的本质是,在任意时刻,每个父—子关系都是唯一的。通常,操作型系统只维护层次树的当前视图,因此,输入数据仓库的数据通常是当前层次树的时间点快照。这就需要由ETL过程来确定发生了哪些变化,以便正确记录历史信息。为了检测出过时的父—子关系,必须通过孩子键进行查询,然后将父亲作为结果返回。在这个例子中,对tree表采用整体拉取模式抽数据,tree\_dim表的c\_name和c\_parent列上使用SCD2装载类型。也就是说,把c\_parent当做源表的一个普通属性,当一个节点的名字或者父节点发生变化时,都增减一条新版本记录,并设置老版本的过期时间。这样的装载过程和销售订单的例子并无二致。

(1)建立init_etl_tree.sh、init_etl_tree.sql、regular_etl_tree.sh、regular_etl_tree.sql四个脚本实现tree_dim维度表的初始装载和定期装载。

init_etl_tree.sh文件用于初始装载,其内容如下:

代码语言:javascript
复制
#!/bin/bash
sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password myassword --table tree --hive-import --hive-table rds.tree --hive-overwrite
beeline -u jdbc:hive2://cdh2:10000/dw -f init_etl_tree.sql

init_etl_tree.sql文件内容如下:

代码语言:javascript
复制
USE dw;  
-- 清空表  
TRUNCATE TABLE tree_dim;
INSERT INTO tree_dim  
SELECT  
  ROW_NUMBER() OVER (ORDER BY t1.c_child) + t2.sk_max  
, t1.c_child  
, t1.c_name  
, t1.c_parent  
, 1  
, '2016-03-01'  
, '2200-01-01'  
FROM  
rds.tree t1 CROSS JOIN (SELECT COALESCE(MAX(sk),0) sk_max FROM tree_dim) t2;

regular_etl_tree.sh文件用于定期装载,其内容如下:

代码语言:javascript
复制
#!/bin/bash
sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password myassword --table tree --hive-import --hive-table rds.tree --hive-overwrite
beeline -u jdbc:hive2://cdh2:10000/dw -f regular_etl_tree.sql

regular_etl_tree.sql文件内容如下:

代码语言:javascript
复制
-- 设置变量以支持事务    
set hive.support.concurrency=true;    
set hive.exec.dynamic.partition.mode=nonstrict;    
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;    
set hive.compactor.initiator.on=true;    
set hive.compactor.worker.threads=1;    
    
USE dw;    
      
-- 设置SCD的生效时间和过期时间    
SET hivevar:cur_date = CURRENT_DATE();
SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);    
SET hivevar:max_date = CAST('2200-01-01' AS DATE);    
      
-- 设置CDC的上限时间    
INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;    

-- SDC2设置过期
UPDATE tree_dim     
   SET expiry_date = ${hivevar:pre_date}      
 WHERE tree_dim.sk IN      
(SELECT a.sk     
   FROM (SELECT sk,    
                c_child,    
                c_name,    
                c_parent    
           FROM tree_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN     
                rds.tree b ON a.c_child = b.c_child     
          WHERE b.c_child IS NULL OR     
          (  !(a.c_name <=> b.c_name)    
          OR !(a.c_parent <=> b.c_parent)    
          ));     
    
-- SCD2新增版本
INSERT INTO tree_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.c_child) + t2.sk_max,    
    t1.c_child,    
    t1.c_name,    
    t1.c_parent,    
    t1.version,    
    t1.effective_date,    
    t1.expiry_date    
FROM      
(      
SELECT      
    t2.c_child c_child,    
    t2.c_name c_name,    
    t2.c_parent c_parent,    
    t1.version + 1 version,    
    ${hivevar:pre_date} effective_date,      
    ${hivevar:max_date} expiry_date      
 FROM tree_dim t1     
INNER JOIN rds.tree t2      
   ON t1.c_child = t2.c_child       
  AND t1.expiry_date = ${hivevar:pre_date}      
 LEFT JOIN tree_dim t3     
   ON t1.c_child = t3.c_child     
  AND t3.expiry_date = ${hivevar:max_date}      
WHERE (!(t1.c_name <=> t2.c_name)    
   OR  !(t1.c_parent <=> t2.c_parent)    
   )    
  AND t3.sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(sk),0) sk_max FROM tree_dim) t2;    
    
-- 新增的记录     
INSERT INTO tree_dim    
SELECT    
    ROW_NUMBER() OVER (ORDER BY t1.c_child) + t2.sk_max,    
    t1.c_child,    
    t1.c_name,    
    t1.c_parent,    
    1,    
    ${hivevar:pre_date},    
    ${hivevar:max_date}    
FROM      
(      
SELECT t1.* FROM rds.tree t1 LEFT JOIN tree_dim t2 ON t1.c_child = t2.c_child      
 WHERE t2.sk IS NULL) t1      
CROSS JOIN      
(SELECT COALESCE(MAX(sk),0) sk_max FROM tree_dim) t2;    

-- 更新时间戳表的last_load字段    
INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;

(2)测试装载过程

代码语言:txt
复制
    初始时源表数据的递归树结构如下图所示:
代码语言:txt
复制
    执行初始装载
代码语言:javascript
复制
./init_etl_tree.sh
代码语言:txt
复制
    查询维度表数据
代码语言:javascript
复制
select * from dw.tree_dim;
代码语言:txt
复制
    结果如下图所示:
代码语言:txt
复制
    从查询结果看到,维度表中新增全部11条记录。
代码语言:txt
复制
    修改源表数据,这次修改了所有节点的名称
代码语言:javascript
复制
use source;
-- 修改名称
UPDATE tree SET c_name = '节点1_1' WHERE c_child = 1;  
UPDATE tree SET c_name = '节点2_1' WHERE c_child = 2;  
UPDATE tree SET c_name = '节点3_1' WHERE c_child = 3;  
UPDATE tree SET c_name = '节点4_1' WHERE c_child = 4;  
UPDATE tree SET c_name = '节点5_1' WHERE c_child = 5;  
UPDATE tree SET c_name = '节点6_1' WHERE c_child = 6;  
UPDATE tree SET c_name = '节点7_1' WHERE c_child = 7;  
UPDATE tree SET c_name = '节点8_1' WHERE c_child = 8;  
UPDATE tree SET c_name = '节点9_1' WHERE c_child = 9;  
UPDATE tree SET c_name = '节点10_1' WHERE c_child = 10;  
UPDATE tree SET c_name = '节点11_1' WHERE c_child = 11;  
COMMIT; 
代码语言:txt
复制
    将regular\_etl.sql文件中的SET hivevar:cur\_date = CURRENT\_DATE();行改为SET hivevar:cur\_date = '2016-07-27';后,执行定期装载
代码语言:javascript
复制
./regular_etl_tree.sh
代码语言:txt
复制
    查询维度表数据
代码语言:javascript
复制
select * from dw.tree_dim;
代码语言:txt
复制
    结果如下图所示:
代码语言:txt
复制
    从查询结果看到,现在维度表中共有22条记录,其中新增11条当前版本记录,老版本的11条记录的过期时间字段被设置为'2016-07-26'。
代码语言:txt
复制
    修改源表数据,这次修改了部分节点的名称,并新增了两个节点。
代码语言:javascript
复制
use source;
/*** 修改名称 ***/  
UPDATE tree SET c_name = '节点1_2' WHERE c_child = 1;  
UPDATE tree SET c_name = '节点3_2' WHERE c_child = 3;  
UPDATE tree SET c_name = '节点5_2' WHERE c_child = 5;  
UPDATE tree SET c_name = '节点8_2' WHERE c_child = 8;  
UPDATE tree SET c_name = '节点11_2' WHERE c_child = 11;

/*** 增加新的根节点,并改变原来的父子关系 ***/  
INSERT INTO tree VALUES (12, '节点12', NULL);  
INSERT INTO tree VALUES (13, '节点13', 12);  
UPDATE tree SET c_parent = 12 WHERE c_child = 1;  
UPDATE tree SET c_parent = 13 WHERE c_child = 3;  
  
COMMIT;
代码语言:txt
复制
    此时源表数据的递归树结构如下图所示:
代码语言:txt
复制
    将regular\_etl.sql文件中的SET hivevar:cur\_date = CURRENT\_DATE();行改为SET hivevar:cur\_date = '2016-07-28';后,执行定期装载
代码语言:javascript
复制
./regular_etl_tree.sh
代码语言:txt
复制
    查询维度表数据
代码语言:javascript
复制
select * from dw.tree_dim;
代码语言:txt
复制
    结果如下图所示:
代码语言:txt
复制
    从查询结果看到,现在维度表中共有29条记录,其中新增7条当前版本记录(5行因为改名,其中1、3既改名又更新父子关系,2行新增节点),更新了5行老版本的过期时间,被设置为'2016-07-27'。
代码语言:txt
复制
    修改源表数据,这次修改了部分节点的名称,并删除了三个节点。
代码语言:javascript
复制
use source;
/*** 修改名称 ***/ 
UPDATE tree SET c_name = '节点2_2' WHERE c_child = 2;  
UPDATE tree SET c_name = '节点3_3' WHERE c_child = 3;  

/*** 删除子树 ***/  
DELETE FROM tree WHERE c_child = 10;  
DELETE FROM tree WHERE c_child = 11;  
DELETE FROM tree WHERE c_child = 4;  
  
COMMIT; 
代码语言:txt
复制
    此时源表数据的递归树结构如下图所示:
代码语言:txt
复制
    将regular\_etl.sql文件中的SET hivevar:cur\_date = CURRENT\_DATE();行改为SET hivevar:cur\_date = '2016-07-29';后,执行定期装载
代码语言:javascript
复制
./regular_etl_tree.sh
代码语言:txt
复制
    查询维度表数据
代码语言:javascript
复制
select * from dw.tree_dim;
代码语言:txt
复制
    结果如下图所示:
代码语言:txt
复制
    从查询结果看到,现在维度表中共有31条记录,其中新增2条当前版本记录(因为改名),更新了5行老版本的过期时间(2行因为改名,3行因为节点删除),被设置为'2016-07-28'。
代码语言:txt
复制
    装载实验完成后,还原regular\_etl.sh脚本,将文件中的SET hivevar:cur\_date = DATE\_ADD(CURRENT\_DATE(),2);行改为SET hivevar:cur\_date = CURRENT\_DATE();

3. 树的展开

代码语言:txt
复制
    有些BI工具的前端不支持递归,这时递归层次树的数据交付技术就是“展开”(explode)递归树。展开是这样一种行为,一边遍历递归树,一边产生新的结构,该结构包含了贯穿树中所有层次的每个可能的关系。展开的结果是一个非递归的关系对表,该表也可能包含描述层次树中关系所处位置的有关属性。下图展示了一个展开树的例子,图中左侧为原递归树数据,右边为树展开后的数据。

将树展开消除了对递归查询的需求,因为层次不再需要自连接。当按这种表格形式将数据交付时,使用简单的SQL查询就可以生成层次树报表。下面说明树展开的实现。

代码语言:javascript
复制
-- 建立展开后的目标表
use rds;
create table tree_expand
(  
  c_child        int,  
  c_parent       int,
  distance       int
);
代码语言:txt
复制
    许多关系数据库都提供递归查询的功能,例如在Oracle中,就可以使用下面的代码展开递归树。
代码语言:javascript
复制
-- Oracle实现
insert into tree_expand (c_child, c_parent, distance)
with rec (c_child, c_parent, distance) as (
  select c_child, c_child, 0
  from tree
  union all
  select r.c_child, s.c_parent, r.distance + 1
  from rec r
  join tree s
    on r.c_parent = s.c_child
  where s.c_parent is not null
)
select * from rec;
代码语言:txt
复制
    目前Hive还没有递归查询功能,但可以使用UDTF来实现。下面的代码取自[https://www.pythian.com/blog/recursion-in-hive/](https://www.pythian.com/blog/recursion-in-hive/)(原来的代码中缺少import部分),它使用Scala语言实现了一个UDTF用于展开树。关于UDTF的API说明,参考[https://hive.apache.org/javadocs/r0.10.0/api/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.html](https://hive.apache.org/javadocs/r0.10.0/api/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.html)。
代码语言:javascript
复制
package UDF
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import org.apache.hadoop.hive.serde2.objectinspector.primitive
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorFactory, StructObjectInspector, ObjectInspector, PrimitiveObjectInspector}

class ExpandTree2UDTF extends GenericUDTF {
  var inputOIs: Array[PrimitiveObjectInspector] = null
  val tree: collection.mutable.Map[String,Option[String]] = collection.mutable.Map()

  override def initialize(args: Array[ObjectInspector]): StructObjectInspector = {
    inputOIs = args.map{_.asInstanceOf[PrimitiveObjectInspector]}
    val fieldNames = java.util.Arrays.asList("id", "ancestor", "level")
    val fieldOI = primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector.asInstanceOf[ObjectInspector]
    val fieldOIs = java.util.Arrays.asList(fieldOI, fieldOI, fieldOI)
    ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
  }

  def process(record: Array[Object]) {
    val id = inputOIs(0).getPrimitiveJavaObject(record(0)).asInstanceOf[String]
    val parent = Option(inputOIs(1).getPrimitiveJavaObject(record(1)).asInstanceOf[String])
    tree += ( id -> parent )
  }

  def close {
    val expandTree = collection.mutable.Map[String,List[String]]()
    def calculateAncestors(id: String): List[String] =
      tree(id) match { case Some(parent) => id :: getAncestors(parent) ; case None => List(id) }
    def getAncestors(id: String) = expandTree.getOrElseUpdate(id, calculateAncestors(id))
    tree.keys.foreach{ id => getAncestors(id).zipWithIndex.foreach{ case(ancestor,level) => forward(Array(id, ancestor, level)) } }
  }
}
代码语言:txt
复制
    将这段代码编译成jar包后,就可以提供给Hive使用。这里生成的jar文件名为Test-0.0.1-SNAPSHOT.jar。
代码语言:txt
复制
    使用下面的命令将相关jar包复制到HDFS。
代码语言:javascript
复制
hdfs dfs -put Test-0.0.1-SNAPSHOT.jar /tmp/
hdfs dfs -put scala-library.jar /tmp/
代码语言:txt
复制
    执行下面的HiveQL进行测试。
代码语言:javascript
复制
-- 添加运行时jar包
add jar hdfs://cdh2:8020/tmp/Test-0.0.1-SNAPSHOT.jar;
add jar hdfs://cdh2:8020/tmp/scala-library.jar;

-- 建立函数
create function expand_tree as 'UDF.ExpandTree2UDTF';

-- 使用UDTF生成展开后的数据
insert overwrite table rds.tree_expand
select expand_tree(cast(c_child as string), cast(c_parent as string)) from rds.tree;

-- 查询树展开后的数据
select * from rds.tree_expand;
代码语言:txt
复制
    查询结果如下图所示。

4. 查询

代码语言:txt
复制
    Hive本身还没有递归查询功能,但正如前面提到的,使用简单的SQL查询递归树展开后的数据,即可生成层次树报表,例如下面的HiveQL语句实现了从下至上的树的遍历。
代码语言:javascript
复制
select c_child, concat_ws('/',collect_set(cast(c_parent as string))) as c_path from tree_expand group by c_child;
代码语言:txt
复制
    查询结果如下图所示。

5. 递归树的平面化

代码语言:txt
复制
    递归树适合于数据仓库,而非递归结构则更适合于数据集市。前面的递归树展开用于消除递归查询,但缺点在于为检索与实体相关的属性必须执行额外的连接操作。而对于层次树来说,很常见的情况是,层次树元素所拥有的唯一属性就是描述属性(本例中的c\_name字段),并且树的最大深度是固定的(本例是4层)。对这种情况,最好是将层次树作为平面化的1NF结或者2NF结构交付给数据集市。这类平面化操作对于平衡的层次树发挥得最好,但将缺失的层次置空可能可能会形成不整齐的层次树,因此它对深度未知的层次树(列数不固定)来说并不是一种有用的技术。下面说明递归树平面化的实现。
代码语言:javascript
复制
-- 建立展开后的目标表
use rds;
create table tree_complanate
(  
  c_0        int,  
  c_0_name   varchar(100), 
  c_1        int,  
  c_1_name   varchar(100),
  c_2        int,  
  c_2_name   varchar(100),
  c_3        int,  
  c_3_name   varchar(100)
);
代码语言:txt
复制
    下面的语句生成递归树平面化后的数据,每个叶子节点一行。
代码语言:javascript
复制
insert overwrite table rds.tree_complanate
select t0.c_0 c_0,t1.c_name c_0_name,
       t0.c_1 c_1,t2.c_name c_1_name,
       t0.c_2 c_2,t3.c_name c_2_name,
       t0.c_3 c_3,t4.c_name c_3_name
  from (select list[3] c_0,list[2] c_1,list[1] c_2,list[0] c_3 
          from (select c_child,split(c_path,'/') list
                  from (select c_child, concat_ws('/',collect_set(cast(c_parent as string))) as c_path 
                          from tree_expand 
                         group by c_child) t) t
 where size(list) = 4) t0 
 inner join (select * from tree) t1 on t0.c_0= t1.c_child
 inner join (select * from tree) t2 on t0.c_1= t2.c_child
 inner join (select * from tree) t3 on t0.c_2= t3.c_child
 inner join (select * from tree) t4 on t0.c_3= t4.c_child;
代码语言:txt
复制
    查询数据
代码语言:javascript
复制
select c_0, c_0_name, c_1, c_1_name, c_2, c_2_name, c_3, c_3_name
  from rds.tree_complanate;
代码语言:txt
复制
    查询结果如下图所示。
举报
领券