首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hive 处理已经存在的小文件方案

hive 处理已经存在的小文件方案

原创
作者头像
王旭平
修改2023-07-04 14:49:53
8.1K0
修改2023-07-04 14:49:53
举报
文章被收录于专栏:大数据生态大数据生态

方案一

归档,archive。Hive 具有内置支持,可将现有分区中的文件转换为 Hadoop 存档(HAR),这样一个曾经由 100 个文件组成的分区只能占用约 3 个文件(取决于设置)。

然而,归档之后只能查询,不支持更新、写入操作。

set hive.archive.enabled=true;
set hive.archive.har.parentdir.settable=true;
set har.partfile.size=1099511627776;

#对表的某个分区进行归档
alter table test_rownumber2 archive partition(dt='20230324');

#解档
alter table test_rownumber2 unarchive partition(dt='20230324');

参考:https://www.docs4dev.com/docs/zh/apache-hive/3.1.1/reference/LanguageManual_Archiving.html

方案二

对于orc文件,可以使用 hive 自带的 concatenate 命令,自动合并小文件

#对于非分区表
alter table A concatenate;

#对于分区表
alter table B partition(dt='2021-05-07',hr='12') concatenate;

注意: 

1、concatenate 命令只支持 RCFILE 和 ORC 文件类型。 

2、使用concatenate命令合并小文件时不能指定合并后的文件数量,但可以多次执行该命令。 

3、当多次使用concatenate后文件数量不在变化,这个跟参数 mapreduce.input.fileinputformat.split.minsize=256mb 的设置有关,可设定每个文件的最小size。

方案三 (推荐)

使用 sql, insert overwrite  重新生成数据。

通过创建和原表表结构的中间表,再将数据从原表导入到中间表。确认数据一致后将中间表改名为原表名,将原表修改为临时表名,最后删除临时表。

#1.创建临时表(创建临时表时需和原表的表结构一致),
create table test.test_table_hive_merge like test.test_table_hive;

#如果存储在cos、ofs上,创建表后可能需要修改存储路径,默认是hdfs路径。
alter table test_table_hive_merge set location 'cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive_merge';


#2.设置合并文件相关会话参数
set hive.exec.dynamic.partition=true; 
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=3000;
set hive.exec.max.dynamic.partitions.pernode=5000;
set hive.merge.mapfiles=true ;
set hive.merge.mapredfiles=true ;	
set hive.merge.tezfiles=true ;
set hive.merge.sparkfiles=true ;
set hive.merge.smallfiles.avgsize=256000000;
set hive.merge.size.per.task=512000000;


#3.合并文件至临时表中,执行前保证没有数据写入原表
#如果有多级分区,将分区名放到partition中
INSERT OVERWRITE TABLE test.test_table_hive_merge  partition(batch_date) SELECT * FROM test.test_table_hive;


#4.查看原表和临时表数据量
set hive.compute.query.using.stats=false ;
set hive.fetch.task.conversion=none;
SELECT count(*) FROM test.test_table_hive;
SELECT count(*) FROM test.test_table_hive_merge;


#5.确认表数据一致后,将原表修改名称为临时表,将中表修改为原表名,使用alter语句。
alter table test.test_table_hive rename to test.test_table_hive_tmp;
alter table test.test_table_hive_merge rename to test.test_table_hive;


#6.查看合并后的分区数和小文件数量
hdfs dfs -count  cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive
hdfs dfs -du -h  cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive/batch_date=20210608
hdfs dfs -du -h  cosn://hadoop-test-cos-1251458/warehouse/test.db/test_table_hive/batch_date=20210608 | wc -l


#7.观察一段时间后再删除临时表
drop  table test.test_table_hive_tmp ;

注修改hive表名的时候,对应表的存储路径会发生变化,如果有任务上传数据到具体路径,需要注意可能需要修改。

相关脚本:

#!/bin/bash

if [ ! -n "$1" ] ;then
    echo "you have not input a file! The file content is the table name, default.test "
else
    echo "the file you input is $1"
fi

hive=/usr/local/service/hive/bin/hive
#使用beeline的话需要修改hs2的地址
#hive=' /usr/local/service/hive/bin/beeline --showDbInPrompt=true -u "jdbc:hive2://10.0.0.30:7001" -n hadoop --outputformat=tsv2 --showHeader=false --silent=true '
#主要传递的参数
hive_sets=" set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions=3000; set  hive.exec.max.dynamic.partitions.pernode=5000; set hive.merge.mapfiles=true; set hive.merge.mapredfiles=true; set hive.merge.tezfiles=true; set hive.merge.sparkfiles=true; set hive.merge.smallfiles.avgsize=256000000; set hive.merge.size.per.task=512000000; "
#临时分区存储文件
touch /tmp/partitions_tmp.txt  ;

#最大支持深度4级分区
funGetPart(){
    pCount=$1 ;
    tName=$2 ;
    taHe=`$3 -n 1 /tmp/partitions_tmp.txt ` ;
    if [ ${pCount} -eq 2 ];then
        partition=$(echo  ${taHe} | awk -F '=|/' '{print $1}')
    elif [ ${pCount} -eq 4 ];then
        partition=$(echo  ${taHe} | awk -F '=|/' '{print $1","$3}')
    elif [ ${pCount} -eq 6 ];then
        partition=$(echo  ${taHe} | awk -F '=|/' '{print $1","$3","$5}')
    elif [ ${pCount} -eq 8 ];then
        partition=$(echo  ${taHe} | awk -F '=|/' '{print $1","$3","$5","$7}')
    else
        echo -e "\033[31m Parsing is not supported  \n \033[0m"
        exit -1 ;
    fi

}

lineTables=$(cat $1)

for dataTab in ${lineTables}
do
    #判断表名是否为预期的,预期为 库名.表名
    dataCount=` echo ${dataTab}| awk -F "." '{print NF}' ` 
    if [ ${dataCount} -ne 2  ];then
       echo -e "\033[31m The file content is incorrect!!! \033[0m"
       echo -e "\033[31m For example: default.test  \n \033[0m"
       exit -1 ;
    fi

    #判断表的分区是否正常
    $hive  -e "SHOW PARTITIONS ${dataTab} ; " > /tmp/partitions_tmp.txt
    if [ ! -s /tmp/partitions_tmp.txt ];then
        echo -e "\033[31m The table => ${dataTab} <= no partitione information \n \033[0m"
        exit -1 ;
    fi
    ##先判断提取的2行分区个数
    tpartCount=`(tail -n 1 /tmp/partitions_tmp.txt | awk -F '=|/' '{print NF}')`
    hpartCount=`(head -n 1 /tmp/partitions_tmp.txt | awk -F '=|/' '{print NF}')`    
    if [ "${tpartCount}" = "" ] || [ "${hpartCount}" = "" ];then
        echo -e "\033[31m The table => ${dataTab} <= no partitione information \n \033[0m"
        exit -1 ;
    fi
    if [ ${tpartCount} -ne ${hpartCount} ];then
        echo -e "\033[31m The table => ${dataTab} <= have different number of partitions \n \033[0m"
        exit -1 ;
    fi
    ##再判断提取的2行分区信息
    funGetPart ${hpartCount} ${dataTab} head
    hpartValue=${partition}
    funGetPart ${tpartCount} ${dataTab} tail
    tpartValue=${partition}
    if [ "${hpartValue}" != "${tpartValue}" ];then
        echo -e "\033[31m The table => ${dataTab} <= have different value of partitions \n \033[0m"
        exit -1 ;
    fi    


    echo "当前处理的表: ${dataTab} , 分区:${tpartValue} "
    echo -e "==> Create table :  ${dataTab}_merge  from: ${dataTab} \n "  
    #当已经存在中间表的时候,选择是否继续执行,默认是退出执行
    #${hive} -e "create table if not exists ${dataTab}_merge like ${dataTab} ; "
    ${hive} -e "create table ${dataTab}_merge like ${dataTab} ; "
    if [ $? -ne 0 ];then
       echo -e "\033[31m  ${dataTab}_merge Creat fail \n \033[0m" ;
       exit -1 ;
    fi
    echo -e "==> Overwrite table :  ${dataTab}_merge , partition: ${tpartValue} , from: ${dataTab} \n "  
    ${hive} -e " ${hive_sets} INSERT OVERWRITE TABLE  ${dataTab}_merge partition(${tpartValue}) SELECT * FROM ${dataTab} ; "
    if [ $? -eq 0 ];then
       echo -e "\033[32m  ${dataTab}_merge Overwrite success \n \033[0m" ;
       #将原表修改名称为临时表,将中间表修改为原表名
       echo -e "==> Rename  ${dataTab} To ${dataTab}_tmp \n " 
       ${hive} -e " alter table ${dataTab} rename to ${dataTab}_tmp ; "
       echo -e "==> Rename  ${dataTab}_merge To ${dataTab} \n " 
       ${hive} -e " alter table  ${dataTab}_merge rename to ${dataTab} ; "
       echo -e "==> 后期需要删除的表名: ${dataTab}_tmp ; drop table ${dataTab}_tmp \n ; " 
    else
       echo -e "\033[31m => ${dataTab} <= merged fail \n \033[0m" 
       exit -1 ;
    fi

sleep 1
done
wait

方案四

对于txt格式的文件可以使用 hadoop getmerge 来合并小文件。

使用  getmerge 命令先合并数据到本地,再通过put上传数据回去。

hadoop fs -getmerge  /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815/*  /home/hadoop/pdate/20220815

hadoop fs -rm  /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815/*

hadoop fs -mkdir -p /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815

hadoop fs -put  /home/hadoop/pdate/20220815  /user/hive/warehouse/xxxx.db/xxxx/pdate=20220815/*

相关脚本参考

#!/bin/bash

if [ ! -n "$1" ] ;then
    echo "you have not input a file!"
else
    echo "the file you input is $1"
fi

lineurl=$(cat $1)

hadoop=/usr/local/service/hadoop/bin/hadoop

localbak=/home/hadoop/pdate

for line in $lineurl
do

    echo "当前处理:  $line "
    file_name=`echo $line | awk -F "=" '{print $NF}'`
    table_name=`echo $line | awk -F "/" '{print $(NF-1)}'`

    mkdir -p ${localbak}/${table_name}
    echo "=> Getmerge  data to ${localbak}/${table_name}/${file_name} "  
    ${hadoop} fs -getmerge  ${line}/*  ${localbak}/${table_name}/${file_name}
    echo "=> Rm ${line} "
    ${hadoop} fs -rm ${line}/*
    ${hadoop} fs -mkdir -p ${line}
    echo "=> Put  ${localbak}/${table_name}/${file_name}  To  ${line}  "  
    ${hadoop} fs -put ${localbak}/${table_name}/${file_name}  ${line}/

    ${hadoop} fs -test -f ${line}/${file_name}
    if [ $? -eq 0 ];then
       echo -e "\033[32m $line merged success \n \033[0m" 
    else
       echo -e "\033[31m $line merged fail \n \033[0m" 
    fi

sleep 1
done
wait

参考:

https://jishuin.proginn.com/p/763bfbd631ad

https://cloud.tencent.com/developer/article/1514064

https://developer.aliyun.com/article/952137

https://www.jianshu.com/p/e5e4fd4b039c

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 方案一
  • 方案二
  • 方案三 (推荐)
  • 方案四
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档