hive拉链工具实战

这个丁延明同学写的一个实战工具,坚持用代码解决问题,推荐!

有相关业务的同学可以一起讨论,下面是正文。


1、背景

大家好

最近由于公司业务需要写了一篇hive拉链工具,下边对工具进行简单的介绍。

工具名为zipperu(意思是拉链工具),由bin,conf,historys,logs,tmp组成。

2、实现原理

具体实现原理是根据业务表(你每天更新的表),你所关注的字段(比如phonenumber发生了变化你就认为这条数据发生了变化,然后更改其历史状态)进行MD5加密,比较该字段的MD5值是否发生变化,则更新该条数据,否则不更新!

\bin:

只有一个简单的脚本,zipperu.sh 用来处理所有的任务,以及业务逻辑

\conf:

conf下边有个zipperu.conf文件,相关内容:

tableN=xxx(需要处理的业务表,由库名.表名组成)

rowkeys={customerid} 中括号里边是业务表的主键,如果是多个用逗号隔开,比如{id,di2,id3}最后一列不加逗号

tableMD5=xxxx tableMD5要生成加md5表的名字

column={birthday} birthday是你业务表需要关注的维度,如果这个字段有变化,就认为这天数据已经更新

其中zipperu.conf文件每行代表一个需要处理的表,字段之间用tab或则空格隔开

\historys:

historys 就是每天拉链自动生成的sql脚本

\logs:

logs 每天运行的任务记录

\tmp:

tmp 脚本执行生成的临时文件,请不要将任何文件放在tmp目录下,因为脚本启动会清空tmp目录。

目前还不支持删除,只支持新增和更改,由于小编水平,有限,多多包涵!

3、代码如下

微信里面不好编辑代码,建议拷贝出来放到编辑器里面查看。

#!/bin/bash
. /etc/profile
cd `dirname $0`
logs_data=`date +%F`
confFile=../conf/zipperu.conf
mkdir -p ../logs/$logs_data
mkdir -p ../historys/$logs_data
cat $confFile | while read linet; 
 do
 rm -rf ../tmp/
 echo "--------------------------------------------------------------------------------------------------正在读取配置文件$confFile----------------------------------------------------------------------------------------------------------------------"
 if [[ "$confFile" = "" ]] ; then
   echo "-------------------------------------------------------------------------您输入的配置文件为空,请输入有效配置文件!-------------------------------------------------------------------------------"
   exit 1
   else
                        echo "----------------------------------------------------------------------------本次拉链,您输入的配置文件为:$confFile-----------------------------------------------------------------"
   fi
 tableN=`echo $linet | awk '{print $1}'|awk -F '=' '{print $2}'`
 if [[ "$tableN" = "" ]] ; then       
                        echo "您的配置文件$linet hive表配置为空,请重新配置"
                        exit 1
 else    
                        echo "----------------------------------------------------------本次拉链,您配置的hive表为:$tableN-------------------------------------------------------------------"
                fi
 rowkeys=`echo $linet | awk '{print $2}'|awk -F '=' '{print $2}' |sed 's/}//g'|sed 's/{//g'`
                if [[ "$rowkeys" = "" ]] ; then
                        echo "您的配置文件$linet hive表主键配置为空,请重新配置"
                        exit 1
                else 
 echo $rowkeys >> ../tmp/$tableN.rowkeys_tmp1
 cat ../tmp/$tableN.rowkeys_tmp1 | tr -s "\","\" "\"\012"\" | sed s/[[:space:]]//g >> ../tmp/$tableN.rowkeys_tmp2
 rowkey=`sed -n '1p' ../tmp/$tableN.rowkeys_tmp2`
 rowkeysn=`cat ../tmp/$tableN.rowkeys_tmp2 |wc -l`
                        echo "----------------------------------------------------------本次拉链,您配置的hive表主键为:$rowkey------------------------------------------------------------------"
                fi
 tableMD5=`echo $linet | awk '{print $3}'|awk -F '=' '{print $2}'`
                if [[ "$tableMD5" = "" ]] ; then
                        echo "您的配置文件$linet hiveMD5表配置为空,请重新配置"
                        exit 1
                else
                        echo "----------------------------------------------------------本次拉链,您配置的hiveMD5表为:$tableMD5-------------------------------------------------------------------"
                fi
 column=`echo $linet | awk '{print $4}'|awk -F '=' '{print $2}'|sed 's/}//g'|sed 's/{//g'` #获取配置文件中的列
 if [[ "$column" = "" ]] ; then
                        echo "您的配置文件$linet下列为空"
                        exit 1
                else
 echo $rowkey
 start_time=`date "+%Y%m%d%H%M%S"`
 start_date=`date +%F`
 end_date=`date +%F`
 etl_time=`date '+%Y-%m-%d %H:%M:%S'`
 tableMD5_Y="${tableMD5}"_Y""
 tableN_his="${tableN}"_his""
 tableN_tmp_h="${tableN}"_tmp_h""
 tableN_tmp_c="${tableN}"_tmp_c""
 #rm -rf ../tmp/
                        echo "----------------------------------------------------------本次拉链,您配置的列为:$column-------------------------------------------------------------------"
 echo $column >> ../tmp/$tableN.tmp
 cat ../tmp/$tableN.tmp |tr -s "\","\" "\"\012"\" | sed s/[[:space:]]//g > ../tmp/$tableN.tmp2
 rm -rf ../tmp/$tableN.tmp
 ln=`cat ../tmp/$tableN.tmp2 | wc -l`
 if [[ "$ln" -gt  "1" ]] ; then
 var=0 
 for line in `cat ../tmp/$tableN.tmp2`;
 do
 linenum=`awk '{print NR}' ../tmp/$tableN.tmp2 |tail -n1` 
 linenum1=`echo $[linenum-1]`
 if [ $linenum1 -eq $var ] ; then
   echo "coalesce($line,''),','" >> ../tmp/$tableN.tmp3 #是最后一个字段处理
   else
 echo "coalesce($line,''),','," >> ../tmp/$tableN.tmp3 #最后一个字段处理
   fi
    ((var+=1))
 done
 rm -rf ../tmp/$tableN.tmp2
 column2=`cat ../tmp/$tableN.tmp3`
 echo $column2 >> ../tmp/$tableN.tmp4
 cat ../tmp/$tableN.tmp4 | sed s/[[:space:]]//g > ../tmp/$tableN.tmp5
 column2=`cat ../tmp/$tableN.tmp5`
###############################################################################################
#获取当前表的字段tableN(业务表的所有字段字段,用来见分区表)
 hive -e "desc $tableN;" >> ../tmp/$tableN.colsinfo_tmp1
 expand ../tmp/$tableN.colsinfo_tmp1 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2
 rm -rf ../tmp/$tableN.colsinfo_tmp1
 tableNcolsn=`cat ../tmp/$tableN.colsinfo_tmp2 |wc -l`
 echo $tableNcolsn
 sed -i 's/$/,/' ../tmp/$tableN.colsinfo_tmp2
 tableNcols=`cat ../tmp/$tableN.colsinfo_tmp2`
 sql0="create table $tableN_his($tableNcols etl_time string , versions int , start_date string) partitioned by (end_date string);" 
 echo $sql0 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 sql1="drop table if exists $tableN_tmp_h;create table $tableN_tmp_h as select *,md5(concat($column2)) as md5_str from $tableN_his where end_date = '3000-12-31';"
 echo $sql1 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 sql2="drop table if exists $tableN_tmp_c;create table $tableN_tmp_c as select *,md5(concat($column2)) as md5_str from $tableN;"
 echo $sql2 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 awk '{print $1}' ../tmp/$tableN.colsinfo_tmp2 > ../tmp/$tableN.colsinfo_tmp3
 echo "etl_time" >>../tmp/$tableN.colsinfo_tmp3
 echo "versions" >>../tmp/$tableN.colsinfo_tmp3
 echo "start_date" >>../tmp/$tableN.colsinfo_tmp3
 cat ../tmp/$tableN.colsinfo_tmp3 | while read fiel;
 do
 echo "h.$fiel," >> ../tmp/$tableN.colsinfo_tmp4
 done
 echo "'$end_date' as end_date" >> ../tmp/$tableN.colsinfo_tmp4
 awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp4 >> ../tmp/$tableN.colsinfo_tmp5
 hall=`cat ../tmp/$tableN.colsinfo_tmp5`
 echo "$hall"
 expand ../tmp/$tableN.colsinfo_tmp2 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2_1
 cat ../tmp/$tableN.colsinfo_tmp2_1 | awk  '{print $1}'| while read fiel2;
 do
 echo "case when c.$rowkey is not null then c.$fiel2 else h.$fiel2 end as $fiel2," >>../tmp/$tableN.colsinfo_tmp2_2
 done 
 awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp2_2 >> ../tmp/$tableN.colsinfo_tmp2_3
 allcase=`cat ../tmp/$tableN.colsinfo_tmp2_2`
 if [[ "$rowkeysn" -eq  "1" ]] ; then
 sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"
 echo $sql3 >>../historys/$logs_data/$start_time$tableN_his.create.sql 
 else 
 sed -i '1d' ../tmp/$tableN.rowkeys_tmp2
 cat ../tmp/$tableN.rowkeys_tmp2 | while read fiel3;
 do
 echo "and h.$fiel3 = c.$fiel3" >>../tmp/$tableN.rowkeys_tmp3
 done 
 rowksys=`cat ../tmp/$tableN.rowkeys_tmp3`
 sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey $rowksys insert overwrite table $tableN_his partition(end_date='$end_date') select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"
 echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 fi
 nohup hive -e "$sql1 $sql2 $sql3" >> ../logs/$logs_data/$start_time$tableN_his.log
 else
###################################加密md5字段数目为1的情况
 column2="$column"
 #获取当前表的字段tableN(业务表的所有字段字段,用来见分区表)
 hive -e "desc $tableN;" >> ../tmp/$tableN.colsinfo_tmp1
 expand ../tmp/$tableN.colsinfo_tmp1 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2
 rm -rf ../tmp/$tableN.colsinfo_tmp1
 tableNcolsn=`cat ../tmp/$tableN.colsinfo_tmp2 |wc -l`
 sed -i 's/$/,/' ../tmp/$tableN.colsinfo_tmp2
 tableNcols=`cat ../tmp/$tableN.colsinfo_tmp2`
 sql0="create table $tableN_his($tableNcols etl_time string , versions int , start_date string) partitioned by (end_date string);" 
 echo $sql0 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 sql1="drop table if exists $tableN_tmp_h;create table $tableN_tmp_h as select *,md5(concat($column2)) as md5_str from $tableN_his where end_date = '3000-12-31';"
 sql2="drop table if exists $tableN_tmp_c;create table $tableN_tmp_c as select *,md5(concat($column2)) as md5_str from $tableN;"
 echo $sql1 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 echo $sql2 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 awk '{print $1}' ../tmp/$tableN.colsinfo_tmp2 > ../tmp/$tableN.colsinfo_tmp3
 echo "etl_time" >>../tmp/$tableN.colsinfo_tmp3
 echo "versions" >>../tmp/$tableN.colsinfo_tmp3
 echo "start_date" >>../tmp/$tableN.colsinfo_tmp3
 cat ../tmp/$tableN.colsinfo_tmp3 | while read fiel;
 do
 echo "h.$fiel," >> ../tmp/$tableN.colsinfo_tmp4
 done
 echo "'$end_date' as end_date" >> ../tmp/$tableN.colsinfo_tmp4
 awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp4 >> ../tmp/$tableN.colsinfo_tmp5
 hall=`cat ../tmp/$tableN.colsinfo_tmp5`
 echo "$hall"
 expand ../tmp/$tableN.colsinfo_tmp2 | tr -s ' ' >>../tmp/$tableN.colsinfo_tmp2_1
 cat ../tmp/$tableN.colsinfo_tmp2_1 | awk  '{print $1}'| while read fiel2;
 do
 echo "case when c.$rowkey is not null then c.$fiel2 else h.$fiel2 end as $fiel2," >>../tmp/$tableN.colsinfo_tmp2_2
 done 
 awk '{if(s){print s};s=$0}END{sub(",$","");print}' ../tmp/$tableN.colsinfo_tmp2_2 >> ../tmp/$tableN.colsinfo_tmp2_3
 allcase=`cat ../tmp/$tableN.colsinfo_tmp2_2`
 if [[ "$rowkeysn" -eq  "1" ]] ; then
 sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"
 echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 else 
 sed -i '1d' ../tmp/$tableN.rowkeys_tmp2
 cat ../tmp/$tableN.rowkeys_tmp2 | while read fiel3;
 do
 echo "and h.$fiel3 = c.$fiel3" >>../tmp/$tableN.rowkeys_tmp3
 done 
 rowksys=`cat ../tmp/$tableN.rowkeys_tmp3`
 sql3="from $tableN_tmp_h h full outer join $tableN_tmp_c c on h.$rowkey = c.$rowkey $rowksys insert overwrite table $tableN_his partition(end_date) select $hall where h.$rowkey is not null and c.$rowkey is not null and h.md5_str <> c.md5_str insert overwrite table $tableN_his partition(end_date='3000-12-31') select $allcase '$etl_time' as etl_time,case when h.$rowkey is null then 0 when h.$rowkey is not null and c.$rowkey is not null and h.md5_str<>c.md5_str then h.versions+1 else h.versions end as versions, IF (h.$rowkey IS not NULL AND c.$rowkey IS NOT NULL and h.md5_str = c.md5_str,h.start_date,'$start_date') AS start_date;"
 echo $sql3 >> ../historys/$logs_data/$start_time$tableN_his.create.sql
 fi
# hive -e "$sql0"
 nohup hive -e "$sql1 $sql2 $sql3"  >> ../logs/$logs_data/$start_time$tableN_his.log 
 fi 
 fi
 rm -rf ../tmp/*
 done

原文发布于微信公众号 - 大数据和云计算技术(jiezhu2007)

原文发表时间:2017-07-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏along的开发之旅

一行Shell代码查找所有代码行数

最近写简历, 想算下自己的这个项目一共有多少行代码, 好方便在简历上展示. 本来想着一直用git管理代码, 顺便统计下好了. 结果竟然没有在git中找到单独统计...

1063
来自专栏Hadoop实操

Hive Load本地数据文件异常分析

使用beeline登录HiveServer2向my_table表Load本地数据文件时报如下异常:

3514
来自专栏Hadoop实操

如何使用Sentry实现Hive/Impala的数据脱敏

本文主要描述如何使用Sentry实现数据的脱敏(masking of sensitive data elements),高大上的叫法也就是Data Maskin...

4906
来自专栏编程微刊

微信小程序云开发 初学者入门教程二

如何操作数据库,作为一名前端,如果对数据的知识不够熟悉也没关系,从现在开始好好学习就行,数据库的操作内容差不多涉及增删改查四大模块,花一些业余的时间在上面,也必...

551
来自专栏云计算相关

使用JClouds在Java中获取和发布云服务器

本文中,我们举例来说明如何使用JClouds API 获取和发布云服务器。JClouds API 可以和大量云服务提供商(包括Amazon EC2和Racksp...

25310

使用 Excel 分析 CloudStack 使用记录

注:本文最初由 David Nailey 在 Build a Cloud 博客上撰写。

1939
来自专栏思考的代码世界

Python网络数据采集之存储数据|第04天

存储媒体文件有两种主要的方式:只获取文件 URL 链接,或者直接把源文件下载下来。

4147
来自专栏张善友的专栏

SQL Server 2012 Express LocalDB

微软最新推出的 SQL Server 2012 Express LocalDB 是一种 SQL Server Express 的运行模式,特别适合用在开发环境使...

2005
来自专栏云计算

使用JClouds在Java中获取和发布云服务器

本文中,我们举例来说明如何使用JClouds API 获取和发布云服务器。JClouds API 可以和大量云服务提供商(包括Amazon EC2和Racksp...

1919
来自专栏数据之美

迷之 crontab 异常:不运行、不报错、无日志

1、背景 前几天新同学入职,一不小心将跳板机上的 crontab 清空了,导致凌晨一大批任务异常,同事问了运维同学也没有备份,这一百多个任务要是恢复起来可不是件...

3686

扫码关注云+社区