hive拉链工具实战

这个丁延明同学写的一个实战工具,坚持用代码解决问题,推荐!

有相关业务的同学可以一起讨论,下面是正文。


1、背景

大家好

最近由于公司业务需要写了一篇hive拉链工具,下边对工具进行简单的介绍。

工具名为zipperu(意思是拉链工具),由bin,conf,historys,logs,tmp组成。

2、实现原理

具体实现原理是根据业务表(你每天更新的表),你所关注的字段(比如phonenumber发生了变化你就认为这条数据发生了变化,然后更改其历史状态)进行MD5加密,比较该字段的MD5值是否发生变化,则更新该条数据,否则不更新!

\bin:

只有一个简单的脚本,zipperu.sh 用来处理所有的任务,以及业务逻辑

\conf:

conf下边有个zipperu.conf文件,相关内容:

tableN=xxx(需要处理的业务表,由库名.表名组成)

rowkeys={customerid} 中括号里边是业务表的主键,如果是多个用逗号隔开,比如{id,di2,id3}最后一列不加逗号

tableMD5=xxxx tableMD5要生成加md5表的名字

column={birthday} birthday是你业务表需要关注的维度,如果这个字段有变化,就认为这天数据已经更新

其中zipperu.conf文件每行代表一个需要处理的表,字段之间用tab或则空格隔开

\historys:

historys 就是每天拉链自动生成的sql脚本

\logs:

logs 每天运行的任务记录

\tmp:

tmp 脚本执行生成的临时文件,请不要将任何文件放在tmp目录下,因为脚本启动会清空tmp目录。

目前还不支持删除,只支持新增和更改,由于小编水平,有限,多多包涵!

3、代码如下

微信里面不好编辑代码,建议拷贝出来放到编辑器里面查看。

#!/bin/bash
. /etc/profile
cd `dirname $0`
logs_data=`date +%F`
confFile=../conf/zipperu.conf
mkdir -p ../logs/$logs_data
mkdir -p ../historys/$logs_data
cat $confFile | while read linet; 
 do
 rm -rf ../tmp/
 echo "--------------------------------------------------------------------------------------------------正在读取配置文件$confFile----------------------------------------------------------------------------------------------------------------------"
 if [[ "$confFile" = "" ]] ; then
   echo "-------------------------------------------------------------------------您输入的配置文件为空,请输入有效配置文件!-------------------------------------------------------------------------------"
   exit 1
   else
                        echo "----------------------------------------------------------------------------本次拉链,您输入的配置文件为:$confFile-----------------------------------------------------------------"
   fi
 tableN=`echo $linet | awk '{print $1}'|awk -F '=' '{print $2}'`
 if [[ "$tableN" = "" ]] ; then       
                        echo "您的配置文件$linet hive表配置为空,请重新配置"
                        exit 1
 else    
                        echo "----------------------------------------------------------本次拉链,您配置的hive表为:$tableN-------------------------------------------------------------------"
                fi
 rowkeys=`echo $linet | awk '{print $2}'|awk -F '=' '{print $2}' |sed 's/}//g'|sed 's/{//g'`
                if [[ "$rowkeys" = "" ]] ; then
                        echo "您的配置文件$linet hive表主键配置为空,请重新配置"
                        exit 1
                else 
 echo $rowkeys >> ../tmp/$tableN.rowkeys_tmp1
 cat ../tmp/$tableN.rowkeys_tmp1 | tr -s "\","\" "\"\012"\" | sed s/[[:space:]]//g >> ../tmp/$tableN.rowkeys_tmp2
 rowkey=`sed -n '1p' ../tmp/$tableN.rowkeys_tmp2`
 rowkeysn=`cat ../tmp/$tableN.rowkeys_tmp2 |wc -l`
                        echo "----------------------------------------------------------本次拉链,您配置的hive表主键为:$rowkey------------------------------------------------------------------"
                fi
 tableMD5=`echo $linet | awk '{print $3}'|awk -F '=' '{print $2}'`
                if [[ "$tableMD5" = "" ]] ; then
                        echo "您的配置文件$linet hiveMD5表配置为空,请重新配置"
                        exit 1
                else
                        echo "----------------------------------------------------------本次拉链,您配置的hiveMD5表为:$tableMD5-------------------------------------------------------------------"
                fi
 column=`echo $linet | awk '{print $4}'|awk -F '=' '{print $2}'|sed 's/}//g'|sed 's/{//g'` #获取配置文件中的列
 if [[ "$column" = "" ]] ; then
                        echo "您的配置文件$linet下列为空"
                        exit 1
                else
 echo $rowkey
 start_time=`date "+%Y%m%d%H%M%S"`
 start_date=`date +%F`
 end_date=`date +%F`
 etl_time=`date '+%Y-%m-%d %H:%M:%S'`
 tableMD5_Y="${tableMD5}"_Y""
 tableN_his="${tableN}"_his""
 tableN_tmp_h="${tableN}"_tmp_h""
 tableN_tmp_c="${tableN}"_tmp_c""
 #rm -rf ../tmp/
                        echo "----------------------------------------------------------本次拉链,您配置的列为:$column-------------------------------------------------------------------"
 echo $column >> ../tmp/$tableN.tmp
 cat ../tmp/$tableN.tmp |tr -s "\","\" "\"\012"\" | sed s/[[:space:]]//g > ../tmp/$tableN.tmp2
 rm -rf ../tmp/$tableN.tmp
 ln=`cat ../tmp/$tableN.tmp2 | wc -l`
 if [[ "$ln" -gt  "1" ]] ; then
 var=0 
 for line in `cat ../tmp/$tableN.tmp2`;
 do
 linenum=`awk '{print NR}' ../tmp/$tableN.tmp2 |tail -n1` 
 linenum1=`echo $[linenum-1]`
 if [ $linenum1 -eq $var ] ; then
   echo "coalesce($line,''),','" >> ../tmp/$tableN.tmp3 #是最后一个字段处理
   else
 echo "coalesce($line,''),','," >> ../tmp/$tableN.tmp3 #最后一个字段处理
   fi
    ((var+=1))
 done
 rm -rf ../tmp/$tableN.tmp2
 column2=`cat ../tmp/$tableN.tmp3`
 echo $column2 >> ../tmp/$tableN.tmp4
 cat ../tmp/$tableN.tmp4 | sed s/[[:space:]]//g > ../tmp/$tableN.tmp5
 column2=`cat ../tmp/$tableN.tmp5`
###############################################################################################
#获取当前表的字段tableN(业务表的所有字段字段,用来见分区表)
 hive -e "desc $tableN;" >> ../tmp/$tableN.colsinfo_tmp1
 expand ../tmp/$tableN.colsinfo_tmp1 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2
 rm -rf ../tmp/$tableN.colsinfo_tmp1
 tableNcolsn=`cat ../tmp/$tableN.colsinfo_tmp2 |wc -l`
 echo $tableNcolsn
 sed -i 's/$/,/' ../tmp/$tableN.colsinfo_tmp2
 tableNcols=`cat ../tmp/$tableN.colsinfo_tmp2`
 sql0="create table $tableN_his($tableNcols etl_time string , versions int , start_date string) partitioned by (end_date string);" 
 echo $sql0 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 sql1="drop table if exists $tableN_tmp_h;create table $tableN_tmp_h as select *,md5(concat($column2)) as md5_str from $tableN_his where end_date = '3000-12-31';"
 echo $sql1 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 sql2="drop table if exists $tableN_tmp_c;create table $tableN_tmp_c as select *,md5(concat($column2)) as md5_str from $tableN;"
 echo $sql2 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 awk '{print $1}' ../tmp/$tableN.colsinfo_tmp2 > ../tmp/$tableN.colsinfo_tmp3
 echo "etl_time" >>../tmp/$tableN.colsinfo_tmp3
 echo "versions" >>../tmp/$tableN.colsinfo_tmp3
 echo "start_date" >>../tmp/$tableN.colsinfo_tmp3
 cat ../tmp/$tableN.colsinfo_tmp3 | while read fiel;
 do
 echo "h.$fiel," >> ../tmp/$tableN.colsinfo_tmp4
 done
 echo "'$end_date' as end_date" >> ../tmp/$tableN.colsinfo_tmp4
 awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp4 >> ../tmp/$tableN.colsinfo_tmp5
 hall=`cat ../tmp/$tableN.colsinfo_tmp5`
 echo "$hall"
 expand ../tmp/$tableN.colsinfo_tmp2 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2_1
 cat ../tmp/$tableN.colsinfo_tmp2_1 | awk  '{print $1}'| while read fiel2;
 do
 echo "case when c.$rowkey is not null then c.$fiel2 else h.$fiel2 end as $fiel2," >>../tmp/$tableN.colsinfo_tmp2_2
 done 
 awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp2_2 >> ../tmp/$tableN.colsinfo_tmp2_3
 allcase=`cat ../tmp/$tableN.colsinfo_tmp2_2`
 if [[ "$rowkeysn" -eq  "1" ]] ; then
 sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"
 echo $sql3 >>../historys/$logs_data/$start_time$tableN_his.create.sql 
 else 
 sed -i '1d' ../tmp/$tableN.rowkeys_tmp2
 cat ../tmp/$tableN.rowkeys_tmp2 | while read fiel3;
 do
 echo "and h.$fiel3 = c.$fiel3" >>../tmp/$tableN.rowkeys_tmp3
 done 
 rowksys=`cat ../tmp/$tableN.rowkeys_tmp3`
 sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey $rowksys insert overwrite table $tableN_his partition(end_date='$end_date') select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"
 echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 fi
 nohup hive -e "$sql1 $sql2 $sql3" >> ../logs/$logs_data/$start_time$tableN_his.log
 else
###################################加密md5字段数目为1的情况
 column2="$column"
 #获取当前表的字段tableN(业务表的所有字段字段,用来见分区表)
 hive -e "desc $tableN;" >> ../tmp/$tableN.colsinfo_tmp1
 expand ../tmp/$tableN.colsinfo_tmp1 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2
 rm -rf ../tmp/$tableN.colsinfo_tmp1
 tableNcolsn=`cat ../tmp/$tableN.colsinfo_tmp2 |wc -l`
 sed -i 's/$/,/' ../tmp/$tableN.colsinfo_tmp2
 tableNcols=`cat ../tmp/$tableN.colsinfo_tmp2`
 sql0="create table $tableN_his($tableNcols etl_time string , versions int , start_date string) partitioned by (end_date string);" 
 echo $sql0 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 sql1="drop table if exists $tableN_tmp_h;create table $tableN_tmp_h as select *,md5(concat($column2)) as md5_str from $tableN_his where end_date = '3000-12-31';"
 sql2="drop table if exists $tableN_tmp_c;create table $tableN_tmp_c as select *,md5(concat($column2)) as md5_str from $tableN;"
 echo $sql1 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 echo $sql2 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 awk '{print $1}' ../tmp/$tableN.colsinfo_tmp2 > ../tmp/$tableN.colsinfo_tmp3
 echo "etl_time" >>../tmp/$tableN.colsinfo_tmp3
 echo "versions" >>../tmp/$tableN.colsinfo_tmp3
 echo "start_date" >>../tmp/$tableN.colsinfo_tmp3
 cat ../tmp/$tableN.colsinfo_tmp3 | while read fiel;
 do
 echo "h.$fiel," >> ../tmp/$tableN.colsinfo_tmp4
 done
 echo "'$end_date' as end_date" >> ../tmp/$tableN.colsinfo_tmp4
 awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp4 >> ../tmp/$tableN.colsinfo_tmp5
 hall=`cat ../tmp/$tableN.colsinfo_tmp5`
 echo "$hall"
 expand ../tmp/$tableN.colsinfo_tmp2 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2_1
 cat ../tmp/$tableN.colsinfo_tmp2_1 | awk  '{print $1}'| while read fiel2;
 do
 echo "case when c.$rowkey is not null then c.$fiel2 else h.$fiel2 end as $fiel2," >>../tmp/$tableN.colsinfo_tmp2_2
 done 
 awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp2_2 >> ../tmp/$tableN.colsinfo_tmp2_3
 allcase=`cat ../tmp/$tableN.colsinfo_tmp2_2`
 if [[ "$rowkeysn" -eq  "1" ]] ; then
 sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"
 echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 else 
 sed -i '1d' ../tmp/$tableN.rowkeys_tmp2
 cat ../tmp/$tableN.rowkeys_tmp2 | while read fiel3;
 do
 echo "and h.$fiel3 = c.$fiel3" >>../tmp/$tableN.rowkeys_tmp3
 done 
 rowksys=`cat ../tmp/$tableN.rowkeys_tmp3`
 sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey $rowksys insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"
 echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 fi
# hive -e "$sql0"
 nohup hive -e "$sql1 $sql2 $sql3"  >> ../logs/$logs_data/$start_time$tableN_his.log 
 fi 
 fi
 rm -rf ../tmp/*
 done

原文发布于微信公众号 - 大数据和云计算技术(jiezhu2007)

原文发表时间:2017-07-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏程序猿

MySQL数据库优化小谈,简短。

更新文章的速度跟不上大家的热情了......,青岛的一场大雪,取暖基本靠抖了。 ? 好勒,现在写正经的。对于优化,这片文章我只说大致思路...

3367
来自专栏量化投资与机器学习

战斗民族开源神器ClickHouse:一款适合于构建量化回测研究系统的高性能列式数据库(二)

编辑部原创 编译:wally21st、 西西 未经允许,不得转载 Tutorial 对于一些私募、投资机构和个人来说,量化投资研究、回测离不开数据的支持。当数据...

1.6K6
来自专栏java一日一条

单机数据库优化的一些实践

数据库优化有很多可以讲,按照支撑的数据量来分可以分为两个阶段:单机数据库和分库分表,前者一般可以支撑500W或者10G以内的数据,超过这个值则需要考虑分库分表。...

602
来自专栏文渊之博

SQL Server内存

背景 最近一个客户找到我说是所有的SQL Server 服务器的内存都被用光了,然后截图给我看了一台服务器的任务管理器。如图 ? 这里要说明一下任务管理器不会完...

2187
来自专栏PHP在线

MYSQL性能优化分享(分库分表)

MYSQL性能优化之分库分表与不停机修改mysql表结构,需要的朋友可以参考下 1、分库分表 很明显,一个主表(也就是很重要的表,例如用户表)无限制的增长势必严...

3545

使用Excel分析CloudStack使用记录

本文的内容最初由David Nailey在Build a Cloud博客上撰写。

18410
来自专栏性能与架构

Mysql Query Cache的负面影响

Query Cache确实是以比较简单的实现带来巨大性能收益的功能。但可能很多人都忽略了使用QueryCache之后所带来的负面影响 (1)Query的hash...

3418
来自专栏北京马哥教育

思路决定出路 | 101个MySQL调试和优化技巧

MySQL是一个功能强大的开源数据库。随着越来越多的数据库驱动的应用程序,人们一直在推动MySQL发展到它的极限。这里是101条调节和优化 MySQL安装的技巧...

2548
来自专栏idba

order by 主键id导致全表扫描的问题

一 简介 在检查某业务数据库的slowlog 时发现一个慢查询,查询时间 1.57s ,检查表结构 where条件字段存在正确的组合索引,正确的情况下优化器应...

882
来自专栏杨建荣的学习笔记

关于权限管理的实用脚本(r4笔记第94天)

在工作中,可能会接触到很多的环境问题,对于权限问题,总是感觉心有余力而力不足,环境太多了,可能在赋予权限的时候会出差错, 比如下面的场景,数据都存储在owner...

2994

扫码关注云+社区