专栏首页米虫的家BigData--MapReduce进阶(一)之框架原理

BigData--MapReduce进阶(一)之框架原理

MapReduce进阶(一)–框架原理

1、InputFormat

MapReduce数据流

2、MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

  • 1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
  • 2)每一个Split切片分配一个MapTask并行实例处理
  • 3)默认情况下,切片大小=BlockSize
  • 4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

3、Job提交流程源码解析

4、FileInputFormat切片源码解析(input.getSplits(job))

1)源码解析
2)切片机制
  • (1)简单地按照文件的内容长度进行切片
  • (2)切片大小,默认等于Block大小
  • (3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
3)切片大小的参数配置

5、小文件切片–CombineTextInputFormat切片机制

生成切片过程包括:虚拟存储过程和切片过程二部分。

(1)虚拟存储过程:

将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。

(2)切片过程:

Code

(a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。

(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。

(c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
	最终会形成3个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

6、自定义InputFormat

1) WholeFileInputFormat 继承FileInputFormat

java

package cn.buildworld.mapreduce.inputformat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * @author MiChong
 * @date 2020-05-25 16:12
 */
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new WholeFileRecordReader();
    }
}
2)自定义RecordReader–WholeFileRecordReader

java

package cn.buildworld.mapreduce.inputformat;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

/**
 * @author MiChong
 * @date 2020-05-25 16:15
 * <p>
 * 自定义RecordReader,处理一个文件,把这个文件直接读成 一个KV值
 */
public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {

    private boolean notRead = true;
    private Text key = new Text();
    private BytesWritable value = new BytesWritable();
    private FSDataInputStream inputStream;
    private Path path;
    private FileSplit fs;

    /**
     * 初始化方法,框架会在开始的时候调用一次
     *
     * @param split
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        //转换切片类型到文件切片
        fs = (FileSplit) split;
        //通过切片获取路径
        path = fs.getPath();
        //通过路径获取文件系统
        FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
        //开流
        inputStream = fileSystem.open(path);
    }

    /**
     * 读取下一组KV值
     *
     * @return 如果读到了,返回true,读完了,返回False
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (notRead) {
            // 具体读文件的过程
            //读key
            key.set(fs.getPath().toString());

            //读value
            byte[] buf = new byte[(int) fs.getLength()];
            inputStream.read(buf);
            value.set(buf, 0, buf.length);

            notRead = false;
            return true;
        } else {

            return false;
        }
    }

    /**
     * 获取到当前的key
     *
     * @return 当前的key
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    /**
     * 获取当前读到的Value
     *
     * @return 当前Value
     * @throws IOException
     * @throws InterruptedException
     */

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    /**
     * 当前数据读取的进度
     *
     * @return 当前进度
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return notRead ? 0 : 1;
    }

    /**
     * 关闭资源
     *
     * @throws IOException
     */
    @Override
    public void close() throws IOException {
        IOUtils.closeStream(inputStream);
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Mac做java开发(四):​hadoop分布式环境搭建

    大数据时代,分布式技术至关重要,因此,这篇文章介绍hadoop分布式环境搭建,作为个人学习大数据技术的实验环境。

    用户5473628
  • 分布式计算—MapReduce、Spark、Storm、Flink分别适用什么场景

    链接:https://www.zhihu.com/question/403840013/answer/1317631316

    大数据老哥
  • 大数据学习必备 | 推荐几个牛X 的 github 项目,助你事半功倍

    大家好,我是 梦想家 Alex 。我们都知道 github 对于程序员们而言,就是一个巨大的“聚宝盆”,上面不仅有很多优质的开源项目,还有很多热...

    大数据梦想家
  • [大数据架构 ]Apache大数据项目目录

    在使用BigData大约8年以上之后,我遇到了大量的项目。Esp Apache的运动对于BigData域非常强大。每个人都会提出一个针对特定解决方案的项目。但是...

    首席架构师智库
  • 超详细步骤!整合Apache Hudi + Flink + CDH

    使用Idea打开Hudi项目,更改packging/hudi-flink-bundle的pom.xml文件,修改flink-bundle-shade-hive2...

    ApacheHudi
  • 2019精炼的大数据技术学习路线

    近年来大数据BigData、人工智能AI、物联网Iot等行业发展迅猛,很多人都想要从事大数据技术开发工作,但是,请问要怎么做,路线是什么?从哪里开始学?学哪些?...

    用户2292346
  • 数读 | 大数据与Hadoop不得不说的事

    大数据在近些年来越来越火热,人们在提到大数据遇到了很多相关概念上的问题,比如云计算、Hadoop等等。那么,大数据是什么、Hadoop是什么,Hadoop和大数...

    CDA数据分析师
  • Storm简介及Storm集群的安装部署

    (1)Storm简介 Storm最早是由BackType公司开发的实时处理系统,底层由Clojure实现。Clojure也是一门基于JVM的高级面向函数式的编...

    魏晓蕾
  • Sqoop学习之路

    Sqoop (SQL to Hadoop) 是Apache顶级项⽬,官⽹地址:http://sqoop.apache.org.

    白贺
  • 【Spark】Spark Local 及 Spark On Standalone 环境搭建

    (1)解压spark安装包 $ cd /opt/softwares/cdh cdh]$ tar -zxf spark-1.6.1-bin-2.5.0-cdh...

    魏晓蕾
  • 靠谱的数据开发从业指南No.82

    有小伙伴让我聊聊数据开发的职业规划和从业指南,因为数据开发从业人员的知识量实在是太太太大了,今天恰好这个机会好好聊聊。

    大蕉
  • 大数据入门:MapReduce基本原理

    在围绕Hadoop形成的大数据技术生态当中,MapReduce的地位,在早期是处于核心地位的,但是伴随着数据处理实时性需求的不断提升,更多新的计算框架出现,Ma...

    成都加米谷大数据
  • 大数据常用技术栈

    提起大数据,不得不提由IBM提出的关于大数据的5V特点:Volume(大量)、Velocity(高速)、Variety(多样)、Value(低价值密度)、Ver...

    大数据学习与分享
  • 大数据常用技术栈

    提起大数据,不得不提由IBM提出的关于大数据的5V特点:Volume(大量)、Velocity(高速)、Variety(多样)、Value(低价值密度)、Ver...

    大数据学习与分享
  • MapReduce 的核心知识点,你都 get 到了吗 ?(干货文章,建议收藏!)

    众所周知,Hadoop 中最核心的两大组件就是 HDFS 和 MapReduce。其中 HDFS 提供了承载海量数据存储的能力,而 MapRed...

    大数据梦想家
  • Hadoop-HA高可用搭建

    2、hosts文件要配置好,三台都可以相互通过主机名ping通,三台虚拟机都要配置!如下:

    可爱见见
  • 2021年大数据Hadoop(十六):MapReduce计算模型介绍

    MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景...

    Lanson
  • 大数据学习之路05——Hadoop原理与架构解析

    Hadoop 是 Apache 开源组织的一个分布式计算开源框架,是一个可以更容易开发和运行处理大规模数据的解决方案,它提供了一套分布式系统基础架构,允许使用简...

    汪志宾
  • Storm与Spark、Hadoop三种框架对比

    Storm与Spark、Hadoop这三种框架,各有各的优点,每个框架都有自己的最佳应用场景。所以,在不同的应用场景下,应该选择不同的框架。

    用户2292346

扫码关注云+社区

领取腾讯云代金券