归档,archive。Hive 具有内置支持,可将现有分区中的文件转换为 Hadoop 存档(HAR),这样一个曾经由 100 个文件组成的分区只能占用约 3 个文件(取决于设置)。
然而,归档之后只能查询,不支持更新、写入操作。
set hive.archive.enabled=true;
set hive.archive.har.parentdir.settable=true;
set har.partfile.size=1099511627776;
#对表的某个分区进行归档
alter table test_rownumber2 archive partition(dt='20230324');
#解档
alter table test_rownumber2 unarchive partition(dt='20230324');
参考:https://www.docs4dev.com/docs/zh/apache-hive/3.1.1/reference/LanguageManual_Archiving.html
对于orc文件,可以使用 hive 自带的 concatenate 命令,自动合并小文件
#对于非分区表
alter table A concatenate;
#对于分区表
alter table B partition(dt='2021-05-07',hr='12') concatenate;
注意:
1、concatenate 命令只支持 RCFILE 和 ORC 文件类型。
2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。
3、当多次使用concatenate后文件数量不在变化,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置有关,可设定每个文件的最小size。
使用 sql, insert overwrite 重新生成数据。
通过创建和原表表结构的中间表,再将数据从原表导入到中间表。确认数据一致后将中间表改名为原表名,将原表修改为临时表名,最后删除临时表。
#1.创建临时表(创建临时表时需和原表的表结构一致),
create table test.test_table_hive_merge like test.test_table_hive;
#如果存储在cos、ofs上,创建表后可能需要修改存储路径,默认是hdfs路径。
alter table test_table_hive_merge set location 'cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive_merge';
#2.设置合并文件相关会话参数
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=3000;
set hive.exec.max.dynamic.partitions.pernode=5000;
set hive.merge.mapfiles=true ;
set hive.merge.mapredfiles=true ;
set hive.merge.tezfiles=true ;
set hive.merge.sparkfiles=true ;
set hive.merge.smallfiles.avgsize=256000000;
set hive.merge.size.per.task=512000000;
#3.合并文件至临时表中,执行前保证没有数据写入原表
#如果有多级分区,将分区名放到partition中
INSERT OVERWRITE TABLE test.test_table_hive_merge partition(batch_date) SELECT * FROM test.test_table_hive;
#4.查看原表和临时表数据量
set hive.compute.query.using.stats=false ;
set hive.fetch.task.conversion=none;
SELECT count(*) FROM test.test_table_hive;
SELECT count(*) FROM test.test_table_hive_merge;
#5.确认表数据一致后,将原表修改名称为临时表,将中表修改为原表名,使用alter语句。
alter table test.test_table_hive rename to test.test_table_hive_tmp;
alter table test.test_table_hive_merge rename to test.test_table_hive;
#6.查看合并后的分区数和小文件数量
hdfs dfs -count cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive
hdfs dfs -du -h cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive/batch_date=20210608
hdfs dfs -du -h cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive/batch_date=20210608 | wc -l
#7.观察一段时间后再删除临时表
drop table test.test_table_hive_tmp ;
注修改hive表名的时候,对应表的存储路径会发生变化,如果有任务上传数据到具体路径,需要注意可能需要修改。
相关脚本:
#!/bin/bash
if [ ! -n "$1" ] ;then
echo "you have not input a file! The file content is the table name, default.test "
else
echo "the file you input is $1"
fi
hive=/usr/local/service/hive/bin/hive
#使用beeline的话需要修改hs2的地址
#hive=' /usr/local/service/hive/bin/beeline --showDbInPrompt=true -u "jdbc:hive2://10.0.0.30:7001" -n hadoop --outputformat=tsv2 --showHeader=false --silent=true '
#主要传递的参数
hive_sets=" set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions=3000; set hive.exec.max.dynamic.partitions.pernode=5000; set hive.merge.mapfiles=true; set hive.merge.mapredfiles=true; set hive.merge.tezfiles=true; set hive.merge.sparkfiles=true; set hive.merge.smallfiles.avgsize=256000000; set hive.merge.size.per.task=512000000; "
#临时分区存储文件
touch /tmp/partitions_tmp.txt ;
#最大支持深度4级分区
funGetPart(){
pCount=$1 ;
tName=$2 ;
taHe=`$3 -n 1 /tmp/partitions_tmp.txt ` ;
if [ ${pCount} -eq 2 ];then
partition=$(echo ${taHe} | awk -F '=|/' '{print $1}')
elif [ ${pCount} -eq 4 ];then
partition=$(echo ${taHe} | awk -F '=|/' '{print $1","$3}')
elif [ ${pCount} -eq 6 ];then
partition=$(echo ${taHe} | awk -F '=|/' '{print $1","$3","$5}')
elif [ ${pCount} -eq 8 ];then
partition=$(echo ${taHe} | awk -F '=|/' '{print $1","$3","$5","$7}')
else
echo -e "\033[31m Parsing is not supported \n \033[0m"
exit -1 ;
fi
}
lineTables=$(cat $1)
for dataTab in ${lineTables}
do
#判断表名是否为预期的,预期为 库名.表名
dataCount=` echo ${dataTab}| awk -F "." '{print NF}' `
if [ ${dataCount} -ne 2 ];then
echo -e "\033[31m The file content is incorrect!!! \033[0m"
echo -e "\033[31m For example: default.test \n \033[0m"
exit -1 ;
fi
#判断表的分区是否正常
$hive -e "SHOW PARTITIONS ${dataTab} ; " > /tmp/partitions_tmp.txt
if [ ! -s /tmp/partitions_tmp.txt ];then
echo -e "\033[31m The table => ${dataTab} <= no partitione information \n \033[0m"
exit -1 ;
fi
##先判断提取的2行分区个数
tpartCount=`(tail -n 1 /tmp/partitions_tmp.txt | awk -F '=|/' '{print NF}')`
hpartCount=`(head -n 1 /tmp/partitions_tmp.txt | awk -F '=|/' '{print NF}')`
if [ "${tpartCount}" = "" ] || [ "${hpartCount}" = "" ];then
echo -e "\033[31m The table => ${dataTab} <= no partitione information \n \033[0m"
exit -1 ;
fi
if [ ${tpartCount} -ne ${hpartCount} ];then
echo -e "\033[31m The table => ${dataTab} <= have different number of partitions \n \033[0m"
exit -1 ;
fi
##再判断提取的2行分区信息
funGetPart ${hpartCount} ${dataTab} head
hpartValue=${partition}
funGetPart ${tpartCount} ${dataTab} tail
tpartValue=${partition}
if [ "${hpartValue}" != "${tpartValue}" ];then
echo -e "\033[31m The table => ${dataTab} <= have different value of partitions \n \033[0m"
exit -1 ;
fi
echo "当前处理的表: ${dataTab} , 分区:${tpartValue} "
echo -e "==> Create table : ${dataTab}_merge from: ${dataTab} \n "
#当已经存在中间表的时候,选择是否继续执行,默认是退出执行
#${hive} -e "create table if not exists ${dataTab}_merge like ${dataTab} ; "
${hive} -e "create table ${dataTab}_merge like ${dataTab} ; "
if [ $? -ne 0 ];then
echo -e "\033[31m ${dataTab}_merge Creat fail \n \033[0m" ;
exit -1 ;
fi
echo -e "==> Overwrite table : ${dataTab}_merge , partition: ${tpartValue} , from: ${dataTab} \n "
${hive} -e " ${hive_sets} INSERT OVERWRITE TABLE ${dataTab}_merge partition(${tpartValue}) SELECT * FROM ${dataTab} ; "
if [ $? -eq 0 ];then
echo -e "\033[32m ${dataTab}_merge Overwrite success \n \033[0m" ;
#将原表修改名称为临时表,将中间表修改为原表名
echo -e "==> Rename ${dataTab} To ${dataTab}_tmp \n "
${hive} -e " alter table ${dataTab} rename to ${dataTab}_tmp ; "
echo -e "==> Rename ${dataTab}_merge To ${dataTab} \n "
${hive} -e " alter table ${dataTab}_merge rename to ${dataTab} ; "
echo -e "==> 后期需要删除的表名: ${dataTab}_tmp ; drop table ${dataTab}_tmp \n ; "
else
echo -e "\033[31m => ${dataTab} <= merged fail \n \033[0m"
exit -1 ;
fi
sleep 1
done
wait
对于txt格式的文件可以使用 hadoop getmerge 来合并小文件。
使用 getmerge 命令先合并数据到本地,再通过put上传数据回去。
hadoop fs -getmerge /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815/* /home/hadoop/pdate/20220815
hadoop fs -rm /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815/*
hadoop fs -mkdir -p /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815
hadoop fs -put /home/hadoop/pdate/20220815 /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815/*
相关脚本参考
#!/bin/bash
if [ ! -n "$1" ] ;then
echo "you have not input a file!"
else
echo "the file you input is $1"
fi
lineurl=$(cat $1)
hadoop=/usr/local/service/hadoop/bin/hadoop
localbak=/home/hadoop/pdate
for line in $lineurl
do
echo "当前处理: $line "
file_name=`echo $line | awk -F "=" '{print $NF}'`
table_name=`echo $line | awk -F "/" '{print $(NF-1)}'`
mkdir -p ${localbak}/${table_name}
echo "=> Getmerge data to ${localbak}/${table_name}/${file_name} "
${hadoop} fs -getmerge ${line}/* ${localbak}/${table_name}/${file_name}
echo "=> Rm ${line} "
${hadoop} fs -rm ${line}/*
${hadoop} fs -mkdir -p ${line}
echo "=> Put ${localbak}/${table_name}/${file_name} To ${line} "
${hadoop} fs -put ${localbak}/${table_name}/${file_name} ${line}/
${hadoop} fs -test -f ${line}/${file_name}
if [ $? -eq 0 ];then
echo -e "\033[32m $line merged success \n \033[0m"
else
echo -e "\033[31m $line merged fail \n \033[0m"
fi
sleep 1
done
wait
参考:
https://jishuin.proginn.com/p/763bfbd631ad
https://cloud.tencent.com/developer/article/1514064
https://developer.aliyun.com/article/952137
https://www.jianshu.com/p/e5e4fd4b039c
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。