前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《快学BigData》--Hadoop总结(G)(40)

《快学BigData》--Hadoop总结(G)(40)

作者头像
小徐
发布2019-08-05 14:41:44
3800
发布2019-08-05 14:41:44
举报
文章被收录于专栏:GreenplumGreenplum

Hadoop总结 - - - - - - - - - - - - - - - - - - - - - - - - - - - - 210

概述 - - - - - - - - - - - - - - - - - - - - - - - - - - - - 211

CDH - - - - - - - - - - - - - - - - - - - - - - - - - - - - 211

安装Hadoop2.6.4 非Zookeeper集群版 - - - - - - - - - - - - - - - 211

安装Hadoop2.6.4 Zookeeper集群版 - - - - - - - - - - - - - - - 216

MapReduce整体的流程详解 - - - - - - - - - - - - - - - - - - - - 225

Hadoop HDFS 系统详解 - - - - - - - - - - - - - - - - - - - - - 226

JAVA 操作HDFS - - - - - - - - - - - - - - - - - - - - - - - - 241

Hadoop MapReduce 实例 - - - - - - - - - - - - - - - - - - - - 248

Hadoop 其他总结 - - - - - - - - - - - - - - - - - - - - - - - - 259

Hadoop 优化总结 - - - - - - - - - - - - - - - - - - - - - - - - 259

Hadoop MapReduce 实例

1-1)、Linux 实例

在项目中把hadoop安装包中的share包导入到项目中。

A)、Map端代码

package bigDate;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/**

* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> KEYIN 是指框架读取到的数据的key的类型,

* 在默认的InputFormat下,读到的key是一行文本的起始偏移量,所以key的类型是Long VALUEIN 是指框架读取到的数据的value的类型

* , 在默认的InputFormat下,读到的value是一行文本的内容,所以value的类型是String

*

*

* KEYOUT 是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,

* 在此wordcount程序中,我们输出的key是单词,所以是String

*

* VALUEOUT 是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定

* 在此wordcount程序中,我们输出的value是单词的数量,所以是Integer

*

* 但是,String ,Long等jdk中自带的数据类型,在序列化时,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架

* 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型

*

* Long ----> LongWritable String ----> Text Integer ----> IntWritable Null

* ----> NullWritable

*/

public class WordCountMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

// 这就是mapreduce框架中一个主体运行程序MapTask所要调用的用户业务逻辑方法

// MapTask会驱动InputFormat去读取数据(keyIN,VALUEIN),每读到一个KV对,就传入这个用户写的map方法中调用一次

// 在默认的inputformat实现中,此处的一个key就是一行的起始偏移量,value就是一行的内容

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] words = line.split(" ");

for (String word : words) {

context.write(new Text(word), new IntWritable(1));

}

}

}

B)、Reduce 端代码

package bigDate;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

// reducetask在调我们写的reduce方法

// reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分

// (数据的key.hashcode%reducetask数==本reductask号)

// reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的:

// 先将自己收到的所有的kv对按照k分组(根据k是否相同)

// 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values

@Override

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int count = 0;

for (IntWritable v : values) {

count += v.get();

}

context.write(key, new IntWritable(count));

}

}

C)、Client 端代码

package bigDate;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class LinuxWordCount {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

// 告诉框架,我们的程序所在jar包的路径

job.setJar("/root/wordcount.jar");

// 告诉框架,我们的程序所用的mapper类和reducer类

job.setMapperClass(WordCountMapper.class);

job.setReducerClass(WordCountReducer.class);

// 告诉框架,我们的mapperreducer输出的数据类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

// 告诉框架,我们的数据读取、结果输出所用的format组件

// TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

// 告诉框架,我们要处理的文件在哪个路径下

FileInputFormat.setInputPaths(job, new Path("/wordcount/"));

// 告诉框架,我们的处理结果要输出到哪里去

FileOutputFormat.setOutputPath(job, new Path("/wordcountOutput/"));

boolean res = job.waitForCompletion(true);

System.exit(res ? 0 : 1);

}

}

D)、上传到Linux

在开发项目中把项目制作成JAR上传到linux

E)、运行wordcount

[root@hadoop1 ~]# hadoop jar wordcount.jar bigDate.LinuxWordCount

***************

File System Counters

FILE: Number of bytes read=241235

FILE: Number of bytes written=2923209

FILE: Number of read operations=0

FILE: Number of large read operations=0

FILE: Number of write operations=0

HDFS: Number of bytes read=196687

HDFS: Number of bytes written=10550

HDFS: Number of read operations=127

HDFS: Number of large read operations=0

HDFS: Number of write operations=11

Map-Reduce Framework

Map input records=773

Map output records=5008

Map output bytes=46713

Map output materialized bytes=56777

Input split bytes=877

Combine input records=0

Combine output records=0

Reduce input groups=603

Reduce shuffle bytes=56777

Reduce input records=5008

Reduce output records=603

Spilled Records=10016

Shuffled Maps =8

Failed Shuffles=0

Merged Map outputs=8

GC time elapsed (ms)=793

CPU time spent (ms)=0

Physical memory (bytes) snapshot=0

Virtual memory (bytes) snapshot=0

Total committed heap usage (bytes)=1370177536

Shuffle Errors

BAD_ID=0

CONNECTION=0

IO_ERROR=0

WRONG_LENGTH=0

WRONG_MAP=0

WRONG_REDUCE=0

File Input Format Counters

Bytes Read=26697

File Output Format Counters

Bytes Written=10550

包含所有运算的信息,请仔细阅读。

详细的请查看:http://blog.csdn.net/xfg0218/article/details/52740327

F)、查看信息

[root@hadoop1 ~]# hadoop fs -cat /wordcountOutput/part-r-00000

2229

"*"18

"AS8

"License");8

"alice,bob18

"kerberos".1

"simple"1

*****************

输出的数据是有序的,原因在于mapreduce中的框架已经把顺序排好了。

1-2)、windows 上开发(是单机版程序)

在项目中把hadoop安装包中的share包导入到项目中。下载win执行插件

链接:http://pan.baidu.com/s/1slAKyK1 密码:mc2d 如果无法下载请联系作者。

A)、Map端代码

package bigDate;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/**

* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> KEYIN 是指框架读取到的数据的key的类型,

* 在默认的InputFormat下,读到的key是一行文本的起始偏移量,所以key的类型是Long VALUEIN 是指框架读取到的数据的value的类型

* , 在默认的InputFormat下,读到的value是一行文本的内容,所以value的类型是String

*

*

* KEYOUT 是指用户自定义逻辑方法返回的数据中key的类型,由用户业务逻辑决定,

* 在此wordcount程序中,我们输出的key是单词,所以是String

*

* VALUEOUT 是指用户自定义逻辑方法返回的数据中value的类型,由用户业务逻辑决定

* 在此wordcount程序中,我们输出的value是单词的数量,所以是Integer

*

* 但是,String ,Long等jdk中自带的数据类型,在序列化时,效率比较低,hadoop为了提高序列化效率,自定义了一套序列化框架

* 所以,在hadoop的程序中,如果该数据需要进行序列化(写磁盘,或者网络传输),就一定要用实现了hadoop序列化框架的数据类型

*

* Long ----> LongWritable String ----> Text Integer ----> IntWritable Null

* ----> NullWritable

*/

public class WordCountMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

// 这就是mapreduce框架中一个主体运行程序MapTask所要调用的用户业务逻辑方法

// MapTask会驱动InputFormat去读取数据(keyIN,VALUEIN),每读到一个KV对,就传入这个用户写的map方法中调用一次

// 在默认的inputformat实现中,此处的一个key就是一行的起始偏移量,value就是一行的内容

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] words = line.split(" ");

for (String word : words) {

context.write(new Text(word), new IntWritable(1));

}

}

}

B)、Reduce 端代码

package bigDate;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

// reducetask在调我们写的reduce方法

// reducetask应该收到了前一阶段(map阶段)中所有maptask输出的数据中的一部分

// (数据的key.hashcode%reducetask数==本reductask号)

// reducetask将这些收到kv数据拿来处理时,是这样调用我们的reduce方法的:

// 先将自己收到的所有的kv对按照k分组(根据k是否相同)

// 将某一组kv中的第一个kv中的k传给reduce方法的key变量,把这一组kv中所有的v用一个迭代器传给reduce方法的变量values

@Override

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

int count = 0;

for (IntWritable v : values) {

count += v.get();

}

context.write(key, new IntWritable(count));

}

}

C)、Client 端代码

package bigDate;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class LinuxWordCount {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

// 加入winutils.exe工具

System.setProperty("hadoop.home.dir",

"E:\\cenos-6.5-hadoop-2.6.4\\hadoop-2.6.4");

Job job = Job.getInstance(conf);

// 告诉框架,我们的程序所在jar包

job.setJarByClass(WordCountDriver.class);

// 告诉框架,我们的程序所用的mapper类和reducer类

job.setMapperClass(WordCountMapper.class);

job.setReducerClass(WordCountReducer.class);

// 告诉框架,我们的mapperreducer输出的数据类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

// 告诉框架,我们的数据读取、结果输出所用的format组件

// TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

// 告诉框架,我们要处理的文件在哪个路径下

FileInputFormat.setInputPaths(job, new Path(

"D:\\hadoop\\wordCountInput"));

// 告诉框架,我们的处理结果要输出到哪里去

FileOutputFormat.setOutputPath(job, new Path(

"D:\\hadoop\\wordCountOuput"));

boolean res = job.waitForCompletion(true);

System.exit(res ? 0 : 1);

}

}

D)、查看运行过程

****************

Shuffle Errors

BAD_ID=0

CONNECTION=0

IO_ERROR=0

WRONG_LENGTH=0

WRONG_MAP=0

WRONG_REDUCE=0

File Input Format Counters

Bytes Read=270651

File Output Format Counters

Bytes Written=107107

详细过程请参考:http://blog.csdn.net/xfg0218/article/details/52741046

1-3)、windows下向Linux集群提交代码

Map端与reduce端的数据不变,只需要修改一下客户端的代码即可,修改如下:

1-1)、客户端修改为

package bigDate;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class LinuxWordCount {

public static void main(String[] args) throws Exception {

System.setProperty("hadoop.home.dir",

"E:\\winutils-hadoop-2.6.4\\hadoop-2.6.4");

Configuration conf = new Configuration();

conf.set("fs.defaultFS", "hdfs://hadoop1:9000");

conf.set("mapreduce,framework,name", "yarn");

conf.set("arn.resourcemanager.hostname", "hadoop1");

// conf.set("-DHADOOP_USER_NAME", "root");

Job job = Job.getInstance(conf);

// 告诉框架,我们的程序所在jar包的路径

job.setJarByClass(LinuxWordCount.class);

// 告诉框架,我们的程序所用的mapper类和reducer类

job.setMapperClass(WordCountMapper.class);

job.setReducerClass(WordCountReducer.class);

// 告诉框架,我们的mapperreducer输出的数据类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

// 告诉框架,我们的数据读取、结果输出所用的format组件

// TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

// 告诉框架,我们要处理的文件在哪个路径下

FileInputFormat.setInputPaths(job, new Path("/wordcount/"));

// 告诉框架,我们的处理结果要输出到哪里去

FileOutputFormat.setOutputPath(job, new Path("/wordcountOutput/"));

boolean res = job.waitForCompletion(true);

System.exit(res ? 0 : 1);

}

}

1-2)、在加上root的权限

-DHADOOP_USER_NAME=root

1-3)、查看运行过程

********************

Shuffle Errors

BAD_ID=0

CONNECTION=0

IO_ERROR=0

WRONG_LENGTH=0

WRONG_MAP=0

WRONG_REDUCE=0

File Input Format Counters

Bytes Read=26697

File Output Format Counters

Bytes Written=10550

详细过程:http://blog.csdn.net/xfg0218/article/details/52742648

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-03-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 河马coding 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Hadoop MapReduce 实例
    • 1-1)、Linux 实例
      • A)、Map端代码
      • B)、Reduce 端代码
      • C)、Client 端代码
      • D)、上传到Linux
      • E)、运行wordcount
      • F)、查看信息
    • 1-2)、windows 上开发(是单机版程序)
      • A)、Map端代码
      • B)、Reduce 端代码
      • C)、Client 端代码
      • D)、查看运行过程
    • 1-3)、windows下向Linux集群提交代码
      • 1-1)、客户端修改为
      • 1-2)、在加上root的权限
      • 1-3)、查看运行过程
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档