每一个成功人士的背后,必定曾经做出过勇敢而又孤独的决定。
放弃不难,但坚持很酷~
前言
这篇文章是我之前在自学 MapReduce 的笔记,想着最近再回顾一下 MapReduce 的原理,于是就整理了一下。
MapReduce 采用的是“分而治之”的数据,当我们处理大规模的数据时,将这些数据拆解成多个部分,并利用集群的多个节点同时进行数据处理,然后将各个节点得到的中间结果进行汇总,经过进一步的计算(该计算也是并行进行的),得到最终结果。
Configuration 是 Hadoop 中五大组件的公用类,org.apache.hadoop.conf.Configuration。这个类是作业的配置信息类,任何作用的配置信息必须通过 Configuration 传递,因为通过 Configuration 可以实现在多个 mapper 和多个 reducer 任务之间共享信息。
Configuration conf = new Configuration();
IntWritable 是 Hadoop 中实现的用于封装 Java 数据类型的类,它的原型是 public IntWritable(int value) 和 public IntWritable() 两种。所以 new IntWritable(1) 是新建了这个类的一个对象,而数值 1 这是参数。在 Hadoop 中它相当于 java 中 Integer 整形变量,为这个变量赋值为 1 。
在 wordCount 这个程序中,后面有语句 context.writer(word, one),即将分割后的字符串形成键值对,<单词,1>,就是这个意思。
如下图所示,有两份文件,经过分片处理之后,会被分成三个分片(split1,split2,split3)。依次作为map阶段的输入。
下图有三行文本,经过分片处理之后,产生了三个分片,每个分片就是一行的三个单词,分别作为 map 阶段的输入。
Split 阶段的输出作为 Map 阶段的输入,一个分片对应一个 Map 任务。在 Map 阶段中,读取 value 值,将 value 值拆分为的形式。key 为 每个单词,value 为 1。
Map 阶段需要考虑 key 是什么,value 是什么。特别是 key ,他将作为后面 reduce 的依据。输出结果例如:<Deer, 1>,<River, 1>,<Bear, 1>,<Bear, 1>。
Map 阶段的输出会作为 Shuffle 阶段的输入。
自定义 map 继承 Mapper ,重写 Mapper 中的方法 map(Object key, Text value, Context context) 。key 和 value 表示输入的 key 和 value ,处理后的数据写入 context,使用方法 context.write(key, value) ,这里的 key 和 value 会传递给下一个过程。
Mapper 参数类型有以下几种:
第一、二个表示输入 map 的 key 和 value ,从 InputFormat 传过来的,key 为每行文本首地址相对于整个文本首地址的偏移量,value 默认是一行。
第三、四个表示输出的 key 和 value 。
可优化点:可以自定义一个合并函数,hadoop 在 Map 阶段会调用它对本地数据进行预合并,可以减少后面的数据传输量和计算量。
Shuffer 阶段过程比较复杂,可以理解为从 Map 输出到 Reduce 输入的过程,而且涉及到网络传输。
将 Map 中 key 相同的都归置到一起,作为一个 Reduce 的输入。输出结果例如:
可优化点:虽然 shuffle 阶段有默认规则,但我们也可以通过自定义分区函数来优化我们的算法。
将 key 相同的数据进行累计。输出结果例如:<Beer, 3>。
这是我之前写的 wordCount 代码,可以通过代码再了解一下 MapReduce 的流程。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 单词统计
*/
public class WordCountRunner {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set("mapreduce.framework.name", "yarn");//集群的方式运行,非本地运行
config.set("mapreduce.app-submission.cross-platform", "true");//意思是跨平台提交,在windows下如果没有这句代码会报错 "/bin/bash: line 0: fg: no job control",去网上搜答案很多都说是linux和windows环境不同导致的一般都是修改YarnRunner.java,但是其实添加了这行代码就可以了。
config.set("mapreduce.job.jar","D:\\ideaWorkSpace\\hadoop\\out\\artifacts\\hadoop_jar\\hadoop.jar");
Job job = Job.getInstance(config);
job.setJarByClass(WordCountRunner.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//要处理的数据输入与输出地址
FileInputFormat.setInputPaths(job,"hdfs://lyz01:8020/test1/word.txt");
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss");
FileOutputFormat.setOutputPath(job,new Path("hdfs://lyz01:8020/output/"+ simpleDateFormat.format(new Date(System.currentTimeMillis()))));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
public static class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String words[] = line.split(" ");
//遍历数组words
for(String word : words){
context.write(new Text(word),new IntWritable(1));
}
}
}
public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable value : values){
count += value.get();
}
context.write(key,new IntWritable(count));
}
}
}
job.setOutputKeyClass 和 job.setOutputValueClass 在默认情况下是同时设置 map 阶段和 reduce 阶段的输出,也就是说只有 map 和 reduce 输出是一样的时候才不会出问题。
当 map 和 reduce 的输出类型不一样时,就需要通过 job.setMapOutputKeyClass 和 job.setMapOutputValueClass 来设置 map 阶段的输出。
总的来说,MapReduce 分为四个过程,分别是 Split、Map、Shuffle、Reduce 这四个阶段。
如果有人再问你 MapReduce 工作原理的话,可以将上面的话说给他听。