专栏首页YoungGyhadoop_入门1

hadoop_入门1

体系结构

概述

hadoop主要包括两部分:

  1. hdfs,文件操作系统
  2. mapreduce,分布式的计算框架

读的过程,客户端先从namenode读取metadata,然后根据metadata知道所需文件对应的数据块,以及数据块对应的datanode的位置。然后读取。

hdfs

主要由3部分组成

  1. block
  2. namenode
  3. datanode

hdfs的文件被分成块进行存储,块的默认大小是64MB,块是文件存储处理的逻辑单元。

两类节点,namenode和datanode。

namenode是管理节点,存放文件元数据。

  1. 文件与数据块的映射表
  2. 数据块与数据节点的映射表

datanode是HDFS的工作节点,存放数据块。

mapreduce

job and task

job分发给每个节点的task,具体有map task和reduce task

每个datanode都伴随着一个tasktracker,这样让计算跟着数据走,减少了很大的开销。

jobtracker 作业调度 分配任务,监控任务的执行进度 监控tasktracker的状态

tasktracker 执行任务 向jobtracker汇报状态

yarn

yarn是一个资源管理器,是在hadoop 2.0后添加的主要部件。

cloudera

读取文件

数据管理与容错

容错机制 1. 重复执行 2. 推测执行,有一个算的慢的话再找一个和它一起算,保证reduce不会因为map没做完不开始效率低。

数据块复制

数据块有副本,默认数据块保留三份。

心跳检测

二级namenode,主要用于备份

HDFS特点

  1. 数据冗余,硬件容错
  2. 流式的数据访问,一次写入,多次读
  3. 存储大文件,如果大量的小文件那么对namenode上的元数据存储压力会很大

适用性和局限性

  1. 适合数据批量读写,吞吐量高;
  2. 不适合交互式应用,低延迟很难满足
  3. 适合一次写入多次读取,顺序读写
  4. 不支持多用户并发写相同文件

应用

单词计数

计算文件中每个单词的频数 输出结果按照字母顺序排序

  1. 编写WordCount.java,包含Mapper类和Reducer类
  2. 编译WordCount.java,javac -classpath need1.jar:need2.jar -d directory WordCount.java
  3. 打包 jar -cvf WordCount.jar *.class
  4. 作业提交 hadoop jar WordCount.jar WordCount input output
//WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
    public static class WordCountMap extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        private final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer token = new StringTokenizer(line);
            while (token.hasMoreTokens()) {
                word.set(token.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class WordCountReduce extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJarByClass(WordCount.class);
        job.setJobName("wordcount");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(WordCountMap.class);
        job.setReducerClass(WordCountReduce.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

排序

对reduce进行分区

//Sort.java
import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Sort {

    public static class Map extends
            Mapper<Object, Text, IntWritable, IntWritable> {

        private static IntWritable data = new IntWritable();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();

            data.set(Integer.parseInt(line));

            context.write(data, new IntWritable(1));

        }

    }

    public static class Reduce extends
            Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

        private static IntWritable linenum = new IntWritable(1);

        public void reduce(IntWritable key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {

            for (IntWritable val : values) {

                context.write(linenum, key);

                linenum = new IntWritable(linenum.get() + 1);
            }

        }
    }

    public static class Partition extends Partitioner<IntWritable, IntWritable> {

        @Override
        public int getPartition(IntWritable key, IntWritable value,
                int numPartitions) {
            int MaxNumber = 65223;
            int bound = MaxNumber / numPartitions + 1;
            int keynumber = key.get();
            for (int i = 0; i < numPartitions; i++) {
                if (keynumber < bound * i && keynumber >= bound * (i - 1))
                    return i - 1;
            }
            return 0;
        }
    }

    /**
     * @param args
     */

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage WordCount <int> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "Sort");
        job.setJarByClass(Sort.class);
        job.setMapperClass(Map.class);
        job.setPartitionerClass(Partition.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Hadoop安装教程_单机及伪分布式

    配置新的hadoop用户 查看是否配置成功 为hadoop用户增加管理员权限 退出重新登陆

    用户1147754
  • 聚类算法简述

    K-MEANS 算法 K-MEANS 评估聚类结果与选择K MapReduce GMM 算法 初始化 过拟合 K-MEANS比较 LDA LDA和cluster...

    用户1147754
  • ML基石_2_LearnAnswer2

    h(x) = sign((\sum_{i=1}^n w_i x_i)-threshold) 其中,xix_i是数据xx的第ii个分量,wiw_i是不同分量...

    用户1147754
  • Hadoop3单机和伪分布式模式安装配置

    为了体验HDFS和MapReduce框架,以及在HDFS上运行示例程序或简单作业,我们首先需要完成单机上的Hadoop安装。所依赖的软件环境如下:

    职场亮哥
  • 爬取B站18000条《黑神话:悟空》实机演示弹幕,做成词云

    从不畏惧死亡,只是不忍世道沦丧。哪怕前途多尸骨,身后无退路—— 这个世界,总有勇敢的生命,再次踏上取经之途。由游戏科学开发的西游题材单机·动作·角色扮演游戏《黑...

    松鼠爱吃饼干
  • Netty 实现简单的RPC远程调用 原

    RPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也太多已这两种为主

    chinotan
  • ERROR Shell:396 - Failed to locate the winutils binary in the hadoop binary path java.io.IOE...

    ERROR Shell:396 - Failed to locate the winutils binary in the hadoop binary path...

    bboy枫亭
  • guava并发工具

    一个传统的Futrue代表一个异步计算的结果:一个可能完成也可能没有完成输出结果的计算。一个Future可以用在进度计算,或者说是 一个提供给我们结果的服务的承...

    IT大咖说
  • 磁盘阵列典型问题分析

    1.在服务器往盘阵中写入或读出数据时报错(如I/0 error,读写延缓失败等),或不能写入数据,或写入过程中出错

    党志强
  • Hadoop安装教程_单机及伪分布式

    配置新的hadoop用户 查看是否配置成功 为hadoop用户增加管理员权限 退出重新登陆

    用户1147754

扫码关注云+社区

领取腾讯云代金券