Hadoop总结 - - - - - - - - - - - - - - - - - - - - - - - - - - - - 210
概述 - - - - - - - - - - - - - - - - - - - - - - - - - - - - 211
CDH - - - - - - - - - - - - - - - - - - - - - - - - - - - - 211
安装Hadoop2.6.4 非Zookeeper集群版 - - - - - - - - - - - - - - - 211
安装Hadoop2.6.4 Zookeeper集群版 - - - - - - - - - - - - - - - 216
MapReduce整体的流程详解 - - - - - - - - - - - - - - - - - - - - 225
Hadoop HDFS 系统详解 - - - - - - - - - - - - - - - - - - - - - 226
JAVA 操作HDFS - - - - - - - - - - - - - - - - - - - - - - - - 241
Hadoop MapReduce 实例 - - - - - - - - - - - - - - - - - - - - 248
Hadoop 其他总结 - - - - - - - - - - - - - - - - - - - - - - - - 259
Hadoop 优化总结 - - - - - - - - - - - - - - - - - - - - - - - - 259
在项目中把hadoop安装包中的share包导入到项目中。
package bigDate;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> KEYIN 是指框架读取到的数据的key的类型,
* 在默认的InputFormat下,读到的key是一行文本的起始偏移量,所以key的类型是Long VALUEIN 是指框架读取到的数据的value的类型
* , 在默认的InputFormat下,读到的value是一行文本的内容,所以value的类型是String
*
*
* KEYOUT 是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,
* 在此wordcount程序中,我们输出的key是单词,所以是String
*
* VALUEOUT 是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定
* 在此wordcount程序中,我们输出的value是单词的数量,所以是Integer
*
* 但是,String ,Long等jdk中自带的数据类型,在序列化时,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架
* 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型
*
* Long ----> LongWritable String ----> Text Integer ----> IntWritable Null
* ----> NullWritable
*/
public class WordCountMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
// 这就是mapreduce框架中一个主体运行程序MapTask所要调用的用户业务逻辑方法
// MapTask会驱动InputFormat去读取数据(keyIN,VALUEIN),每读到一个KV对,就传入这个用户写的map方法中调用一次
// 在默认的inputformat实现中,此处的一个key就是一行的起始偏移量,value就是一行的内容
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
package bigDate;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
// reducetask在调我们写的reduce方法
// reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分
// (数据的key.hashcode%reducetask数==本reductask号)
// reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的:
// 先将自己收到的所有的kv对按照k分组(根据k是否相同)
// 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable v : values) {
count += v.get();
}
context.write(key, new IntWritable(count));
}
}
package bigDate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class LinuxWordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 告诉框架,我们的程序所在jar包的路径
job.setJar("/root/wordcount.jar");
// 告诉框架,我们的程序所用的mapper类和reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 告诉框架,我们的mapperreducer输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 告诉框架,我们的数据读取、结果输出所用的format组件
// TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 告诉框架,我们要处理的文件在哪个路径下
FileInputFormat.setInputPaths(job, new Path("/wordcount/"));
// 告诉框架,我们的处理结果要输出到哪里去
FileOutputFormat.setOutputPath(job, new Path("/wordcountOutput/"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
在开发项目中把项目制作成JAR上传到linux
[root@hadoop1 ~]# hadoop jar wordcount.jar bigDate.LinuxWordCount
***************
File System Counters
FILE: Number of bytes read=241235
FILE: Number of bytes written=2923209
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=196687
HDFS: Number of bytes written=10550
HDFS: Number of read operations=127
HDFS: Number of large read operations=0
HDFS: Number of write operations=11
Map-Reduce Framework
Map input records=773
Map output records=5008
Map output bytes=46713
Map output materialized bytes=56777
Input split bytes=877
Combine input records=0
Combine output records=0
Reduce input groups=603
Reduce shuffle bytes=56777
Reduce input records=5008
Reduce output records=603
Spilled Records=10016
Shuffled Maps =8
Failed Shuffles=0
Merged Map outputs=8
GC time elapsed (ms)=793
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=1370177536
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=26697
File Output Format Counters
Bytes Written=10550
包含所有运算的信息,请仔细阅读。
详细的请查看:http://blog.csdn.net/xfg0218/article/details/52740327
[root@hadoop1 ~]# hadoop fs -cat /wordcountOutput/part-r-00000
2229
"*"18
"AS8
"License");8
"alice,bob18
"kerberos".1
"simple"1
*****************
输出的数据是有序的,原因在于mapreduce中的框架已经把顺序排好了。
在项目中把hadoop安装包中的share包导入到项目中。下载win执行插件
链接:http://pan.baidu.com/s/1slAKyK1 密码:mc2d 如果无法下载请联系作者。
package bigDate;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> KEYIN 是指框架读取到的数据的key的类型,
* 在默认的InputFormat下,读到的key是一行文本的起始偏移量,所以key的类型是Long VALUEIN 是指框架读取到的数据的value的类型
* , 在默认的InputFormat下,读到的value是一行文本的内容,所以value的类型是String
*
*
* KEYOUT 是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,
* 在此wordcount程序中,我们输出的key是单词,所以是String
*
* VALUEOUT 是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定
* 在此wordcount程序中,我们输出的value是单词的数量,所以是Integer
*
* 但是,String ,Long等jdk中自带的数据类型,在序列化时,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架
* 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型
*
* Long ----> LongWritable String ----> Text Integer ----> IntWritable Null
* ----> NullWritable
*/
public class WordCountMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
// 这就是mapreduce框架中一个主体运行程序MapTask所要调用的用户业务逻辑方法
// MapTask会驱动InputFormat去读取数据(keyIN,VALUEIN),每读到一个KV对,就传入这个用户写的map方法中调用一次
// 在默认的inputformat实现中,此处的一个key就是一行的起始偏移量,value就是一行的内容
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
package bigDate;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
// reducetask在调我们写的reduce方法
// reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分
// (数据的key.hashcode%reducetask数==本reductask号)
// reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的:
// 先将自己收到的所有的kv对按照k分组(根据k是否相同)
// 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable v : values) {
count += v.get();
}
context.write(key, new IntWritable(count));
}
}
package bigDate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class LinuxWordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 加入winutils.exe工具
System.setProperty("hadoop.home.dir",
"E:\\cenos-6.5-hadoop-2.6.4\\hadoop-2.6.4");
Job job = Job.getInstance(conf);
// 告诉框架,我们的程序所在jar包
job.setJarByClass(WordCountDriver.class);
// 告诉框架,我们的程序所用的mapper类和reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 告诉框架,我们的mapperreducer输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 告诉框架,我们的数据读取、结果输出所用的format组件
// TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 告诉框架,我们要处理的文件在哪个路径下
FileInputFormat.setInputPaths(job, new Path(
"D:\\hadoop\\wordCountInput"));
// 告诉框架,我们的处理结果要输出到哪里去
FileOutputFormat.setOutputPath(job, new Path(
"D:\\hadoop\\wordCountOuput"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
****************
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=270651
File Output Format Counters
Bytes Written=107107
详细过程请参考:http://blog.csdn.net/xfg0218/article/details/52741046
Map端与reduce端的数据不变,只需要修改一下客户端的代码即可,修改如下:
package bigDate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class LinuxWordCount {
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir",
"E:\\winutils-hadoop-2.6.4\\hadoop-2.6.4");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
conf.set("mapreduce,framework,name", "yarn");
conf.set("arn.resourcemanager.hostname", "hadoop1");
// conf.set("-DHADOOP_USER_NAME", "root");
Job job = Job.getInstance(conf);
// 告诉框架,我们的程序所在jar包的路径
job.setJarByClass(LinuxWordCount.class);
// 告诉框架,我们的程序所用的mapper类和reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 告诉框架,我们的mapperreducer输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 告诉框架,我们的数据读取、结果输出所用的format组件
// TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 告诉框架,我们要处理的文件在哪个路径下
FileInputFormat.setInputPaths(job, new Path("/wordcount/"));
// 告诉框架,我们的处理结果要输出到哪里去
FileOutputFormat.setOutputPath(job, new Path("/wordcountOutput/"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
-DHADOOP_USER_NAME=root
********************
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=26697
File Output Format Counters
Bytes Written=10550
详细过程:http://blog.csdn.net/xfg0218/article/details/52742648