前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据学习(二)-------- MapReduce

大数据学习(二)-------- MapReduce

作者头像
大数据流动
发布2019-08-08 11:58:16
3480
发布2019-08-08 11:58:16
举报
文章被收录于专栏:实时计算实时计算

前提已经安装好hadoop的hdfs集群,可以查看

https://cloud.tencent.com/developer/article/1482908

Mapreduce是hadoop的运算框架,可以对hdfs中的数据分开进行计算,先执行很多maptask,在执行reducetask,这个过程中任务的执行需要一个任务调度的平台,就是yarn。

一、安装YARN集群

yarn集群中有两个角色:

主节点:Resource Manager 1台

从节点:Node Manager N台

Resource Manager一般安装在一台专门的机器上

Node Manager应该与HDFS中的data node重叠在一起

修改配置文件:yarn-site.xml

代码语言:javascript
复制
<property>
<name>yarn.resourcemanager.hostname</name>
<value>主机名</value>
</property>

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>

<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>

然后scp到所有机器,修改主节点hadoop的slaves文件,列入要启动nodemanager的机器,配好免密

然后,就可以用脚本启动yarn集群:

sbin/start-yarn.sh

停止:

sbin/stop-yarn.sh

页面:http://主节点:8088 看看node manager节点是否识别

开发一个提交job到yarn的客户端类,mapreduce所有jar和自定义类,打成jar包上传到hadoop集群中的任意一台机器上,运行jar包中的(YARN客户端类

hadoop jar ......JobSubmitter

二、开发mapreduce程序

注意理解分而治之的思想,先进行map:映射,对应,个数不变。 reduce:化简,合并,将一系列数据,化简为一个值。

主要需要开发:

map阶段的进、出数据,

reduce阶段的进、出数据,

类型都应该是实现了HADOOP序列化框架的类型,如:

String对应Text

Integer对应IntWritable

Long对应LongWritable

例子wordcount代码:

WordcountMapper

代码语言:javascript
复制
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        // 切单词
        String line = value.toString();
        String[] words = line.split(" ");
        for(String word:words){
            context.write(new Text(word), new IntWritable(1));
            
        }
    }
}

WordcountReducer

代码语言:javascript
复制
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
    
        
        int count = 0;
        
        Iterator<IntWritable> iterator = values.iterator();
        while(iterator.hasNext()){
            
            IntWritable value = iterator.next();
            count += value.get();
        }
        
        context.write(key, new IntWritable(count));
        
    }

}







public class JobSubmitter {
    
    public static void main(String[] args) throws Exception {
        
        // 在代码中设置JVM系统参数,用于给job对象来获取访问HDFS的用户身份
        System.setProperty("HADOOP_USER_NAME", "root");
        
        
        Configuration conf = new Configuration();
        // 1、设置job运行时要访问的默认文件系统
        conf.set("fs.defaultFS", "hdfs://hdp-01:9000");
        // 2、设置job提交到哪去运行
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "hdp-01");
        // 3、如果要从windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数
        conf.set("mapreduce.app-submission.cross-platform","true");
        
        Job job = Job.getInstance(conf);
        
        // 1、封装参数:jar包所在的位置
        job.setJar("d:/wc.jar");
        //job.setJarByClass(JobSubmitter.class);
        
        // 2、封装参数: 本次job所要调用的Mapper实现类、Reducer实现类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);
        
        // 3、封装参数:本次job的Mapper实现类、Reducer实现类产生的结果数据的key、value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        
        
        Path output = new Path("/wordcount/output");
        FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");
        if(fs.exists(output)){
            fs.delete(output, true);
        }
        
        // 4、封装参数:本次job要处理的输入数据集所在路径、最终结果的输出路径
        FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
        FileOutputFormat.setOutputPath(job, output);  // 注意:输出路径必须不存在
        
        
        // 5、封装参数:想要启动的reduce task的数量
        job.setNumReduceTasks(2);
        
        // 6、提交job给yarn
        boolean res = job.waitForCompletion(true);
        
        System.exit(res?0:-1);
        
    }
    
    

}

MR还有一些高级的用法:自定义类型,自定义Partitioner,Combiner,排序,倒排索引,自定义GroupingComparator

三、mapreduce与yarn的核心机制

yarn是一个分布式程序的运行调度平台

yarn中有两大核心角色:

1、Resource Manager

接受用户提交的分布式计算程序,并为其划分资源

管理、监控各个Node Manager上的资源情况,以便于均衡负载

2、Node Manager

管理它所在机器的运算资源(cpu + 内存)

负责接受Resource Manager分配的任务,创建容器、回收资源

Mapreduce工作机制:

划分输入切片——》 环形缓冲区 ——》 分区排序 ——》Combiner 局部聚合——》shuffle ——》GroupingComparator——》输出

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-04-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档