大数据开发之用CombineFileInputFormat优化Hadoop小文件

HDFS设计是用来存储海量数据的,特别适合存储TB、PB量级别的数据。但是随着时间的推移,HDFS上可能会存在大量的小文件,这里说的小文件指的是文件大小远远小于一个HDFS块(128MB)的大小;HDFS上存在大量的小文件至少会产生以下影响:

消耗NameNode大量的内存

延长MapReduce作业的总运行时间

本文将介绍如何在MapReduce作业层面上将大量的小文件合并,以此减少运行作业的Map Task的数量;关于如何在HDFS上合并这些小文件,请参见《Hadoop小文件优化》。

Hadoop内置提供了一个CombineFileInputFormat类来专门处理小文件,其核心思想是:根据一定的规则,将HDFS上多个小文件合并到一个InputSplit中,然后会启用一个Map来处理这里面的文件,以此减少MR整体作业的运行时间。

如果用户没有设置这个参数(默认就是没设置),那么同一个机架上的所有小文件将组成一个InputSplit,最终由一个Map Task来处理;

如果用户设置了这个参数,那么同一个节点(node)上的文件将会组成一个InputSplit。

同一个InputSplit包含了多个HDFS块文件,这些信息存储在CombineFileSplit类中,它主要包含以下信息:

从上面的定义可以看出,CombineFileSplit类包含了每个块文件的路径、起始偏移量、相对于原始偏移量的大小以及这个文件的存储节点,因为一个CombineFileSplit包含了多个小文件,所以需要使用数组来存储这些信息。

现在我们就来看看如何使用CombineTextInputFormat类,如下:

importorg.apache.hadoop.mapreduce.Mapper;

importjava.io.IOException;

importjava.util.List;

/////////////////////////////////////////////////////////////////////

User: 过往记忆

Date:2017-04-25

Time:22:59

bolg: https://www.iteblog.com

本文地址:https://www.iteblog.com/archives/2139

过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货

/////////////////////////////////////////////////////////////////////

publicclassHadoopTestextendsConfiguredimplementsTool {

privatestaticfinalLog LOG = LogFactory.getLog(HadoopTest.class);

privatestaticfinallongONE_MB =1024* 1024L;

staticclassTextFileMapperextendsMapper{

@Override

protectedvoidmap(LongWritable key, Text value, Context context)

throwsIOException, InterruptedException {

Configuration configuration = context.getConfiguration();

LOG.warn("#######################"+ configuration.get(MRJobConfig.MAP_INPUT_FILE));

Text filenameKey =newText(configuration.get(MRJobConfig.MAP_INPUT_FILE));

context.write(filenameKey, value);

}

}

publicstaticvoidmain(String[] args)throwsException {

intexitCode = ToolRunner.run(newHadoopTest(), args);

System.exit(exitCode);

}

@Override

publicintrun(String[] args)throwsException {

Configuration conf =newConfiguration(getConf());

Job job = Job.getInstance(conf);

FileInputFormat.setInputPaths(job, args[0]);

FileOutputFormat.setOutputPath(job,newPath(args[1]));

job.setJarByClass(HadoopTest.class);

job.setInputFormatClass(CombineTextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.setMapperClass(TextFileMapper.class);

returnjob.waitForCompletion(true) ?0:1;

}

}

上面的程序很简单,其实就是将HDFS上多个小文件合并到大文件中,并再每行存储了这行数据的文件路径。程序运行的结果如下:

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180227A0CVVJ00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券