hadoop_入门1

体系结构

概述

hadoop主要包括两部分:

  1. hdfs,文件操作系统
  2. mapreduce,分布式的计算框架

读的过程,客户端先从namenode读取metadata,然后根据metadata知道所需文件对应的数据块,以及数据块对应的datanode的位置。然后读取。

hdfs

主要由3部分组成

  1. block
  2. namenode
  3. datanode

hdfs的文件被分成块进行存储,块的默认大小是64MB,块是文件存储处理的逻辑单元。

两类节点,namenode和datanode。

namenode是管理节点,存放文件元数据。

  1. 文件与数据块的映射表
  2. 数据块与数据节点的映射表

datanode是HDFS的工作节点,存放数据块。

mapreduce

job and task

job分发给每个节点的task,具体有map task和reduce task

每个datanode都伴随着一个tasktracker,这样让计算跟着数据走,减少了很大的开销。

jobtracker 作业调度 分配任务,监控任务的执行进度 监控tasktracker的状态

tasktracker 执行任务 向jobtracker汇报状态

yarn

yarn是一个资源管理器,是在hadoop 2.0后添加的主要部件。

cloudera

读取文件

数据管理与容错

容错机制 1. 重复执行 2. 推测执行,有一个算的慢的话再找一个和它一起算,保证reduce不会因为map没做完不开始效率低。

数据块复制

数据块有副本,默认数据块保留三份。

心跳检测

二级namenode,主要用于备份

HDFS特点

  1. 数据冗余,硬件容错
  2. 流式的数据访问,一次写入,多次读
  3. 存储大文件,如果大量的小文件那么对namenode上的元数据存储压力会很大

适用性和局限性

  1. 适合数据批量读写,吞吐量高;
  2. 不适合交互式应用,低延迟很难满足
  3. 适合一次写入多次读取,顺序读写
  4. 不支持多用户并发写相同文件

应用

单词计数

计算文件中每个单词的频数 输出结果按照字母顺序排序

  1. 编写WordCount.java,包含Mapper类和Reducer类
  2. 编译WordCount.java,javac -classpath need1.jar:need2.jar -d directory WordCount.java
  3. 打包 jar -cvf WordCount.jar *.class
  4. 作业提交 hadoop jar WordCount.jar WordCount input output
//WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
    public static class WordCountMap extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        private final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer token = new StringTokenizer(line);
            while (token.hasMoreTokens()) {
                word.set(token.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class WordCountReduce extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJarByClass(WordCount.class);
        job.setJobName("wordcount");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(WordCountMap.class);
        job.setReducerClass(WordCountReduce.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

排序

对reduce进行分区

//Sort.java
import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Sort {

    public static class Map extends
            Mapper<Object, Text, IntWritable, IntWritable> {

        private static IntWritable data = new IntWritable();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();

            data.set(Integer.parseInt(line));

            context.write(data, new IntWritable(1));

        }

    }

    public static class Reduce extends
            Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

        private static IntWritable linenum = new IntWritable(1);

        public void reduce(IntWritable key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {

            for (IntWritable val : values) {

                context.write(linenum, key);

                linenum = new IntWritable(linenum.get() + 1);
            }

        }
    }

    public static class Partition extends Partitioner<IntWritable, IntWritable> {

        @Override
        public int getPartition(IntWritable key, IntWritable value,
                int numPartitions) {
            int MaxNumber = 65223;
            int bound = MaxNumber / numPartitions + 1;
            int keynumber = key.get();
            for (int i = 0; i < numPartitions; i++) {
                if (keynumber < bound * i && keynumber >= bound * (i - 1))
                    return i - 1;
            }
            return 0;
        }
    }

    /**
     * @param args
     */

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage WordCount <int> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "Sort");
        job.setJarByClass(Sort.class);
        job.setMapperClass(Map.class);
        job.setPartitionerClass(Partition.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Jack的Android之旅

刨解OkHttp之缓存机制

时间一晃而过,今天想给大家带来OkHttp的zuihou最后一篇文章,主要讲一下OkHttp的缓存机制。OkHttp的责任链中有一个拦截器就是专门应对OkHtt...

1132
来自专栏圣杰的专栏

ABP入门系列(4)——创建应用服务

一、解释下应用服务层 应用服务用于将领域(业务)逻辑暴露给展现层。展现层通过传入DTO(数据传输对象)参数来调用应用服务,而应用服务通过领域对象来执行相应的业务...

2347
来自专栏Golang语言社区

Go-简洁的并发

多核处理器越来越普及。有没有一种简单的办法,能够让我们写的软件释放多核的威力?是有的。随着Golang, Erlang, Scala等为并发设计的程序语言的兴起...

39812
来自专栏智能大石头

线程池ThreadPool及Task调度机制分析

近1年,偶尔发生应用系统启动时某些操作超时的问题,特别在使用4核心Surface以后。笔记本和台式机比较少遇到,服务器则基本上没有遇到过。

680
来自专栏恰同学骚年

Hadoop学习笔记—8.Combiner与自定义Combiner

  在第四篇博文《初识MapReduce》中,我们认识了MapReduce的八大步凑,其中在Map阶段总共五个步骤,如下图所示:

661
来自专栏大内老A

WCF后续之旅(10): 通过WCF Extension实现以对象池的方式创建Service Instance

我们知道WCF有3种典型的对service instance进行实例化的方式,他们分别与WCF的三种InstanceContextMode相匹配,他们分别是Pe...

1858
来自专栏同步博客

降低Redis内存占用

  Redis为列表、集合、散列、有序集合提供了一组配置选项,这些选项可以让redis以更节约的方式存储较短的结构。

511
来自专栏wOw的Android小站

[Objective-C]深入理解GCD

GCD(Grand Central Dispatch)是libdispatch的市场名称,而libdispatch作为Apple的一个库,为并发代码在多核硬件(...

471
来自专栏大内老A

Enterprise Library深入解析与灵活应用(8):WCF与Exception Handling AppBlock集成[下]

在上篇中,我详细介绍了如何通过自定义ClientMessageInspector和ErrorHandler,实现WCF与微软企业库中的Exception Han...

17110
来自专栏IT技术精选文摘

排查Java的内存问题

核心要点 排查Java的内存问题可能会非常困难,但是正确的方法和适当的工具能够极大地简化这一过程; Java HotSpot JVM会报告各种OutOfMemo...

5745

扫码关注云+社区