hadoop_入门1

体系结构

概述

hadoop主要包括两部分:

  1. hdfs,文件操作系统
  2. mapreduce,分布式的计算框架

读的过程,客户端先从namenode读取metadata,然后根据metadata知道所需文件对应的数据块,以及数据块对应的datanode的位置。然后读取。

hdfs

主要由3部分组成

  1. block
  2. namenode
  3. datanode

hdfs的文件被分成块进行存储,块的默认大小是64MB,块是文件存储处理的逻辑单元。

两类节点,namenode和datanode。

namenode是管理节点,存放文件元数据。

  1. 文件与数据块的映射表
  2. 数据块与数据节点的映射表

datanode是HDFS的工作节点,存放数据块。

mapreduce

job and task

job分发给每个节点的task,具体有map task和reduce task

每个datanode都伴随着一个tasktracker,这样让计算跟着数据走,减少了很大的开销。

jobtracker 作业调度 分配任务,监控任务的执行进度 监控tasktracker的状态

tasktracker 执行任务 向jobtracker汇报状态

yarn

yarn是一个资源管理器,是在hadoop 2.0后添加的主要部件。

cloudera

读取文件

数据管理与容错

容错机制 1. 重复执行 2. 推测执行,有一个算的慢的话再找一个和它一起算,保证reduce不会因为map没做完不开始效率低。

数据块复制

数据块有副本,默认数据块保留三份。

心跳检测

二级namenode,主要用于备份

HDFS特点

  1. 数据冗余,硬件容错
  2. 流式的数据访问,一次写入,多次读
  3. 存储大文件,如果大量的小文件那么对namenode上的元数据存储压力会很大

适用性和局限性

  1. 适合数据批量读写,吞吐量高;
  2. 不适合交互式应用,低延迟很难满足
  3. 适合一次写入多次读取,顺序读写
  4. 不支持多用户并发写相同文件

应用

单词计数

计算文件中每个单词的频数 输出结果按照字母顺序排序

  1. 编写WordCount.java,包含Mapper类和Reducer类
  2. 编译WordCount.java,javac -classpath need1.jar:need2.jar -d directory WordCount.java
  3. 打包 jar -cvf WordCount.jar *.class
  4. 作业提交 hadoop jar WordCount.jar WordCount input output
//WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
    public static class WordCountMap extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        private final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer token = new StringTokenizer(line);
            while (token.hasMoreTokens()) {
                word.set(token.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class WordCountReduce extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJarByClass(WordCount.class);
        job.setJobName("wordcount");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(WordCountMap.class);
        job.setReducerClass(WordCountReduce.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

排序

对reduce进行分区

//Sort.java
import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Sort {

    public static class Map extends
            Mapper<Object, Text, IntWritable, IntWritable> {

        private static IntWritable data = new IntWritable();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();

            data.set(Integer.parseInt(line));

            context.write(data, new IntWritable(1));

        }

    }

    public static class Reduce extends
            Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

        private static IntWritable linenum = new IntWritable(1);

        public void reduce(IntWritable key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {

            for (IntWritable val : values) {

                context.write(linenum, key);

                linenum = new IntWritable(linenum.get() + 1);
            }

        }
    }

    public static class Partition extends Partitioner<IntWritable, IntWritable> {

        @Override
        public int getPartition(IntWritable key, IntWritable value,
                int numPartitions) {
            int MaxNumber = 65223;
            int bound = MaxNumber / numPartitions + 1;
            int keynumber = key.get();
            for (int i = 0; i < numPartitions; i++) {
                if (keynumber < bound * i && keynumber >= bound * (i - 1))
                    return i - 1;
            }
            return 0;
        }
    }

    /**
     * @param args
     */

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage WordCount <int> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "Sort");
        job.setJarByClass(Sort.class);
        job.setMapperClass(Map.class);
        job.setPartitionerClass(Partition.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏编码小白

tomcat请求处理分析(六)servlet的处理过程

1.1.1.1  servlet的解析过程 servlet的解析分为两步实现,第一个是匹配到对应的Wrapper,第二个是加载对应的servlet并进行数据,这...

7217
来自专栏石奈子的Java之路

原 SpringBoot 2.0 系列00

1434
来自专栏分布式系统进阶

Librdkafka的操作处理队列

2902
来自专栏瓜大三哥

Yaffs_guts(三)

1.垃圾回收 1.static int yaffs_InitialiseBlocks(yaffs_Device *dev,int nBlocks)//块初始化 ...

2265
来自专栏静默虚空的博客

JAVA 设计模式 中介者模式

用途 中介者模式 (Mediator) 用一个中介对象来封装一系列的对象交互。中介者使各对象不需要显示地相互引用,从而使其耦合松散,而且可以独立地改变它们之间...

18310
来自专栏JackieZheng

Spring读书笔记——bean加载

我们的日常开发几乎离不开Spring,他为我们的开发带来了很大的便捷,那么Spring框架是如何做到方便他人的呢。今天就来说说bean如何被加载加载。 我们在x...

2209
来自专栏数据之美

hadoop 里执行 MapReduce 任务的几种常见方式

说明: 测试文件: echo -e "aa\tbb \tcc\nbb\tcc\tdd" > 3.txt hadoop fs -put 3.txt /tm...

2058
来自专栏数据结构与算法

博弈论入门之斐波那契博弈

斐波那契博弈 斐波那契博弈是一种经典的博弈问题 有一堆石子,两个顶尖聪明的人玩游戏,先取者可以取走任意多个,但不能全取完,以后每人取的石子数不能超过上个人的两...

39015
来自专栏恰童鞋骚年

Hadoop学习笔记—4.初识MapReduce

  MapReduce是Google的一项重要技术,它首先是一个编程模型,用以进行大数据量的计算。对于大数据量的计算,通常采用的处理手法就是并行计算。但对许多开...

932
来自专栏about云

discuz论坛apache日志hadoop大数据分析项目:hive以及hbase是如何入库以及代码实现

about云discuz论坛apache日志hadoop大数据分析项目: 数据时如何导入hbase与hive的到了这里项目的基本核心功能已经完成。这里介绍一下h...

3648

扫码关注云+社区

领取腾讯云代金券