前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MapReduce之自定义inputFormat合并小文件

MapReduce之自定义inputFormat合并小文件

作者头像
大数据梦想家
发布2021-01-22 16:54:15
7930
发布2021-01-22 16:54:15
举报

无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。

小文件的优化无非以下几种方式:

  1. 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
  2. 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
  3. 在mapreduce处理时,可采用combineInputFormat提高效率

本篇博客小菌将大家实现的是上述第二种方式! 先让我们确定程序的核心机制:

自定义一个InputFormat 改写RecordReader,实现一次读取一个完整文件封装为KV 在输出时使用SequenceFileOutPutFormat输出合并文件

具体的代码如下:

自定义InputFromat

代码语言:javascript
复制
public class Custom_FileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {


    /*
   直接返回文件不可切割,保证一个文件是一个完整的一行
    */

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {

        return false;

    }


    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        Custom_RecordReader custom_recordReader = new Custom_RecordReader();


        custom_recordReader.initialize(split,context);


        return custom_recordReader;
    }
}

自定义RecordReader

代码语言:javascript
复制
/**
 * 
 * RecordReader的核心工作逻辑:
 * 通过nextKeyValue()方法去读取数据构造将返回的key   value
 * 通过getCurrentKey 和 getCurrentValue来返回上面构造好的key和value
 * 
 * 
 * @author
 *
 */
public class Custom_RecordReader extends RecordReader<NullWritable, BytesWritable> {

    private FileSplit fileSplit;

    private Configuration conf;

    private BytesWritable bytesWritable = new BytesWritable();

    private boolean pressced = false;


    //初始化

    /**
     *
     * @param split   封装的文件的对象内容
     * @param context 上下文对象
     * @throws IOException
     * @throws InterruptedException
     */

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        this.fileSplit = (FileSplit) split;
        this.conf=context.getConfiguration();

    }

    //读取下一个文件
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {


        if (!pressced){

            //获取文件路径
            Path path = fileSplit.getPath();

            //获取FileSystem对象
            FileSystem fileSystem = null;
            FSDataInputStream inputStream = null;

            try {
                fileSystem = FileSystem.get(conf);

                //读取文件
                inputStream = fileSystem.open(path);

                //初始化一个字节数据,大小为文件的长度
                byte[] bytes = new byte[(int)fileSplit.getLength()];

                //把数据流转换成字节数组
                IOUtils.readFully(inputStream,bytes,0,bytes.length);

                //把 字节数组 转换成 BytesWritable 对象
                bytesWritable.set(bytes,0,bytes.length);


            } catch (Exception e) {
                e.printStackTrace();

            }finally {

                fileSystem.close();

                if (null != inputStream ){
                    inputStream.close();
                }
            }

            pressced = true;
            return true;

        }else{

            return false;
        }

    }

    //获取当前的key值
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();

    }

    //获取当前的value值
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return bytesWritable;
    }

    //获取当前的进程
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return pressced?0:1;
    }

    //关流
    @Override
    public void close() throws IOException {


    }
}

map处理

代码语言:javascript
复制
public class Custom_Mapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {


    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {

        FileSplit fileSplit = (FileSplit)context.getInputSplit();

        String name = fileSplit.getPath().getName();

        context.write(new Text(name),value);



    }
}

定义mapreduce处理流程

代码语言:javascript
复制
public class Customer_Driver {

    public static void main(String[] args) throws Exception {


        //1.实例化job对象
        Job job = Job.getInstance(new Configuration(), "Customer_Driver");
        job.setJarByClass(Customer_Driver.class);

        //2.设置输入
        job.setInputFormatClass(Custom_FileInputFormat.class);
        Custom_FileInputFormat.addInputPath(job,new Path("E:\\2019大数据课程\\DeBug\\测试\\order\\素材\\5\\自定义inputformat_小文件合并\\input"));

        //3.设置Map
        job.setMapperClass(Custom_Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        //4.设置Reduce()
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        //5.设置输出
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setOutputPath(job,new Path("E:\\2019大数据课程\\DeBug\\测试\\order\\素材\\5\\自定义inputformat_小文件合并\\output4"));
        
        boolean b = job.waitForCompletion(true);
        System.out.println(b?0:1);

    }
}

到这里我们的程序就算完成了。在运行之前,我们先打开我们程序读取的目录,可以看到在input目录下有两个文件。

在这里插入图片描述
在这里插入图片描述

然后运行程序。

在这里插入图片描述
在这里插入图片描述

伴随着成功运行,我们可以再进入到程序输出目录,查看情况。

在这里插入图片描述
在这里插入图片描述

可以发现该文件已经把多个文件的内容合并在了一起,部分内容出现乱码属于正常现象。这是由于该过程属于序列化后的结果,如果我们想要看到文件最初的内容需要后续做反序列化处理!

那么本次分享的内容就到这里了,下期小菌将为大家带来MapReduce之自定义outputFormat的内容,敬请期待!!!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-11-19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 自定义InputFromat
  • 自定义RecordReader
  • map处理
  • 定义mapreduce处理流程
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档