前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hadoop的分布式计算系统MapReduce

Hadoop的分布式计算系统MapReduce

作者头像
姜同学
发布2022-10-27 16:37:57
5620
发布2022-10-27 16:37:57
举报
文章被收录于专栏:姜同学姜同学

一.序列化

在MapReduce中要求被传输的数据能够被序列化 MapReduce中的序列化机制使用的是AVRO,MapReduce对AVRO进行了封装 被传输的类实现Writable接口实现方法即可

二.mapreduce 排序

代码语言:javascript
复制
在MapReduce中会自动对被传输的key值进行排序,如果使用一个对象
作为输出键,那么要求对象相对应的类应该实现Comparable接口,考虑到
MapReduce中被传输的对象要求被序列化,所以MapReduce中提供了WritableComparable
接口.
如果ComparaTo方法中返回值为0,则MapReduce在进行计算时会把两个键的值放到
一个迭代器中,输出是第二个key是没有记录的。

mapreduce 分区

代码语言:javascript
复制
我们在使用MapReduce对HDFS中的数据进行计算时,有时可能会有分类
输出的场景,MapReduce中提供了Partitioner类,我们在使用时只需继承
该类,然后重写getPartition方法即可,分区编号默认从0开始。
有多少个分区JobTracter就会分配多少个reduceTask。分区数量要在
驱动类中指定,如果不指定分区类与ReduceTask的数量,则使用默认
的HashPartitioner类进行分区,也就是自定义的分区无效。

mapreduce 合并

代码语言:javascript
复制
1.合并是减少数据总量并没有改变计算结果 - Combiner(合并)实际上只是
让MapTask进行提前聚合,最后ReduceTask在进行总的聚合.
2.并不是所有的场景都适合于用Combiner,像求和、求最值、去重等可以使用
combiner,但是例如求平均的场景不适合与使用Combiner

inputFormat

inputFormat用来获取切片并创建流来读取数据,如果不自定义InputFormat 则默认使用TextInputFormt按行读取Mapper获取的key值为当前行数的偏移量, 自定义inputFormat类只需要继承FileInputFormat类自定义读取文件的即可.

Code

案例:对HDFS中的case.txt文件按月份对每个人的收益进行降序排序

inputformat类
代码语言:javascript
复制
package com.jmy.profitcase;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;

// 泛型表示读完之后给mapper的数据类型
public class CaseInputFormat extends FileInputFormat<Text, Text> {

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new CaseReader();
    }
}

class CaseReader extends RecordReader<Text,Text>{

    private LineReader reader;
    private Text key;
    private Text value;

    // 初始化Reader
    // 最终目的就是拿到读取切片的流
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        FileSplit fs = (FileSplit) inputSplit;
        Path path = fs.getPath();

        // 连接HDFS获取切片
        FileSystem fileSystem = FileSystem.get(URI.create(path.toString()), taskAttemptContext.getConfiguration());
        // 打开对应的文件获取输入流
        InputStream in = fileSystem.open(path);
       // 字节流转换为字符流按行读取
        reader = new LineReader(in);
    }

    // 判断有无下一个键值对
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        key = new Text();
        value = new Text();
        Text tmp = new Text();

        // 将读取到的一行数据传入到这个参数中 返回值为读取到字节数
        if (reader.readLine(tmp) == ) {
            return false;
        }
        key.set(tmp);

        if (reader.readLine(tmp) == ) {
            return false;
        }
        value.set(tmp);

        return true;
    }

    // 获取当前key值
    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    // 获取当前value值
    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    // 获取进度
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return ;
    }

    // 关流
    @Override
    public void close() throws IOException {
        if (reader != null)
            reader.close();
    }
}
Case类用来封装读取的文本
代码语言:javascript
复制
package com.jmy.profitcase;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Case implements WritableComparable<Case> {
    private int month;

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    private String name;
    private int inCome;
    private int outCome;
    private int countCome;

    public int getInCome() {
        return inCome;
    }

    public void setInCome(int inCome) {
        this.inCome = inCome;
    }

    public int getOutCome() {
        return outCome;
    }

    public void setOutCome(int outCome) {
        this.outCome = outCome;
    }

    public int getCountCome() {
        return inCome - outCome;
    }

    public void setCountCome(int countCome) {
        this.countCome = countCome;
    }

    @Override
    public int compareTo(Case o) {
        return o.getCountCome() - this.getCountCome();
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(month);
        dataOutput.writeUTF(name);
        dataOutput.writeInt(inCome);
        dataOutput.writeInt(outCome);
        dataOutput.writeInt(countCome);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        month = dataInput.readInt();
        name = dataInput.readUTF();
        inCome = dataInput.readInt();
        outCome = dataInput.readInt();
        countCome = dataInput.readInt();
    }
}
Mapper类
代码语言:javascript
复制
package com.jmy.profitcase;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class CaseMapper extends Mapper<Text,Text,Case,NullWritable> {
    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        String[] s = key.toString().split(" ");
        String[] s1 = value.toString().split(" ");
        Case aCase = new Case();
        aCase.setMonth(Integer.parseInt(s[]));
        aCase.setName(s[]);
        aCase.setInCome(Integer.parseInt(s1[]));
        aCase.setOutCome(Integer.parseInt(s1[]));

        context.write(aCase,NullWritable.get());
    }
}
Reducer类
代码语言:javascript
复制
package com.jmy.profitcase;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class CaseReducer extends Reducer<Case, NullWritable,Text, IntWritable> {
    @Override
    protected void reduce(Case key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(new Text(key.getName()),new IntWritable(key.getCountCome()));
    }
}
Driver类
代码语言:javascript
复制
package com.jmy.profitcase;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CaseDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job 提交给JobTracter
        Job job = Job.getInstance();

        // 入口类
        job.setJarByClass(CaseDriver.class);
        // mapper类
        job.setMapperClass(CaseMapper.class);
        // reducer类
        job.setReducerClass(CaseReducer.class);
        // inputformat类
        job.setInputFormatClass(CaseInputFormat.class);
        // partitioner类 reduceTask数量
        job.setPartitionerClass(CasePartition.class);
        job.setNumReduceTasks();

        // mapper
        job.setMapOutputKeyClass(Case.class);
        job.setMapOutputValueClass(NullWritable.class);
        // reducer
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 处理文件路径
        FileInputFormat.addInputPath(job,new Path("hdfs://10.42.99.103:9000/case.txt"));
        // 文件输出路径
        FileOutputFormat.setOutputPath(job,new Path("hdfs://10.42.99.103:9000/caseResult"));

        // 启动
        job.waitForCompletion(true);
    }
}
case.txt
代码语言:javascript
复制
1 ls 
2850 100
2 ls
3566 200
3 ls 
4555 323
1 zs 
19000 2000
2 zs 
28599 3900
3 zs 
34567 5000
1 ww 
355 10
2 ww 
555 222
3 ww 
667 192

数据本地化策略

代码语言:javascript
复制
1.job会被提交到JobTracker,JobTracker会访问HDFS中的namenode,
获取Block的存储位置以及大小.
2.JobTracker收到文件信息之后会对文件进行切片,默认Block的大小
就是切片的大小,切片的数量决定了mapTask的数量。
3.JobTacker计算完mapTask与ReduceTask的数量之后会把任务提交给TaskTracker
,为了减少集群间节点间的访问,TaskTracker会与datanode部署在同一个节点上
4. JobTracker在分配任务的时候,会尽量将任务分配给有数据的节点
  • 如果是空文件,则整个文件作为一个切片处理
  • 在MapReduce中,文件有可切和不可切的区分。在MapReduce中,默认文件是可切的,但是有些文件处理的时候不能切分,这个时候需要手动设置为不可切,例如压缩包
  • 如果文件不可切,则整个文件作为一个切片处理
  • 计算切片大小的公式为Math.min(minSize,Math.max(spilteSize,maxSize))
  • 在进行切片计算的时候底层有一个阈值为1.1

Job任务提交流程

代码语言:javascript
复制
1.客户端将任务提交给JobTracker:hadoop jar ***.jar
2.准备阶段:
  a.检查输入路径是否存在,输出路径是否不存在
  b.计算切片数量以及分区
  c.如果有需要,可以设置分布式缓存存根账户
  d.将jar包提交到HDFS上
  e.将任务提交到JobTracker上
3. 提交阶段
a. JobTracker会计算MapTask的数量和ReduceTask的数量。
MapTask的数量由切片数量决定,ReduceTask的数量由分区数量决定
b. JobTracker在划分好之后,会等待TaskTracker的心跳。
当收到TaskTracker的心跳的时候,JobTracker就会将MapTask或者
ReduceTask分配给TaskTracker。在分配的时候,MapTask要尽量满足
数据本地化策略;ReduceTask尽量分配到相对空闲的节点上
c. TaskTracker在领取到任务之后,去连接HDFS下载对应的jar包
体现的逻辑移动数据固定的思想
d. TaskTracker会在本节点上开启一个JVM子进程执行MapTask或者
ReduceTask。每一个MapTask或者ReduceTask的执行都会开启一个JVM
子进程

Shuffle

一、Map端的Shuffle
代码语言:javascript
复制
1. Mapper中的map方法在处理完一行数据之后,会将数据写出到缓冲区中
2. 数据在缓冲区中进行分区、排序,如果指定了Combiner,那么数据在缓冲区中还会进行combine合并 - 采取了快速排序的方式
3. 每一个MapTask自带一个缓冲区,缓冲区本质上是一个环形的字节数组。设置为环形的优势在于能够重复利用缓冲区而不用寻址
4. 缓冲区是维系在内存中,缓冲区的默认容量是100M
5. 缓冲区的容量使用达到一定限度(溢写阈值:0.8,目的是为了避免MapTask写出结果的时候产生大量的阻塞)的时候,MapTask会将缓冲区中的数据溢写(spill)到磁盘上,后续的数据可以继续写到缓冲区中
6. 每一次溢写都会产生一个新的溢写文件。单个溢写文件中的数据是分区且排序的,但是所有的溢写文件中的数据是局部有序整体无序
7. 当MapTask将所有数据都处理完成之后,会将所有的溢写文件合并(merge)成一个结果文件(final out)。如果一部分结果在缓冲区中,一部分结果在溢写文件中,这个时候所有的结果会直接合并到最后的final out中。如果没有产生溢写过程,则缓冲区中的数据直接冲刷到final out中
8. 在merge过程中,数据会再次进行分区和排序,所以final out是整体分区且有序。这个过程中的排序使用的是归并排序。如果指定了Combiner,并且溢写文件的个数大于等于3个,那么在merge过程中自动进行combine
二、Reduce端的Shuffle
代码语言:javascript
复制
1. 每一个ReduceTask启动fetch线程通过get请求去抓取数据
2. 在抓取数据的时候,每一个ReduceTask只抓取当前分区的数据。在抓取到数据的之后,会将数据存储在本地的磁盘上
3. 在抓取完成之后,ReduceTask会将这些小文件进行merge,合并成一个大文件。在合并过程中,数据会再次进行排序,采取的是归并排序
4. 合并完成之后,会将相同的键对应的值放到一个迭代器中,这个过程称之为分组(group),形成的结构就是一个键对应一个迭代器每一个键触发一次reduce方法
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-06T,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一.序列化
  • 二.mapreduce 排序
  • mapreduce 分区
  • mapreduce 合并
  • inputFormat
  • Code
    • inputformat类
      • Case类用来封装读取的文本
        • Mapper类
          • Reducer类
            • Driver类
              • case.txt
              • 数据本地化策略
              • Job任务提交流程
              • Shuffle
                • 一、Map端的Shuffle
                  • 二、Reduce端的Shuffle
                  相关产品与服务
                  数据保险箱
                  数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档