Hadoop - MapReduce

作者:tutorialspoint

译者:java达人

来源:https://www.tutorialspoint.com/hadoop/hadoop_mapreduce.htm(点击文末阅读原文前往)

通过MapReduce框架,我们可以编写应用程序在商用机器集群上以可靠的方式并行处理大量的数据。

MapReduce是什么?

MapReduce是基于java的分布式计算程序模型和处理技术。 MapReduce算法包含两个重要的任务,即Map和Reduce。 Map接受一组数据并将其转换为另一组数据,这些独立的元素分解成元组(键/值对)。 然后是reduce任务,它接受map的输出作为输入,并将这些数据元组组成一组更小的元组。 就像MapReduce的名字所暗示的那样,reduce任务总是在map之后执行。

MapReduce的主要优势是,它很容易在多个计算节点上作大规模的数据处理。 在MapReduce模式下,数据处理原语被称为mappers和reducers。 将数据处理应用程序分解为mappers和reducers有时是不容易的。 但是,一旦我们以MapReduce形式编写应用程序,那么扩展应用程序,让它运行在成百上千,甚至上万的机器集群中只是一个修改配置的问题。 正是这一点可伸缩性吸引了许多程序员使用MapReduce模型。

算法

  • 通常MapReduce范例基于将计算实体发送到数据所在的地方。
  • MapReduce程序执行分三个阶段,即map阶段, shuffle阶段,和reduce阶段。

map阶段 :map或mapper的工作是处理输入数据。 一般输入数据是以文件或目录的形式存在,存储在Hadoop文件系统(HDFS)。 输入文件逐行传递给mapper函数。mapper处理数据并创建一些小数据块。

reduce阶段 :这个阶段是Shuffle 阶段和 Reduce阶段的组合。Reducer的工作是处理来自于mapper的数据。 处理完成后,生成一组新的输出存储到HDFS中。

  • MapReduce任务期间,Hadoop 发送Map和Reduce任务给集群中相应的服务器。
  • 该框架管理有关数据传递的所有细节,如发布任务,验证任务完成,在集群的节点之间复制数据。
  • 大部分的计算发生在本地节点,这些节点在磁盘中存储有数据,这减少了网络流量。
  • 给定的任务完成后,由集群归集数据,产生一个适当的结果,并将其发送回Hadoop服务器。

输入和输出(Java视角)

MapReduce框架操作< key,value>对,也就是说,框架将任务的输入视为一组<key,value>对,并生成一组< key,value>对作为任务的输出,只是类型不同。

键和值相关的类应该通过框架进行序列化,需要实现Writable接口。此外,key类必须实现Writable-Comparable接口,使框架方便排序。一个MapReduce任务的输入和输出类型:(Input)< k1,v1 > - >map- > < k2,v2 > - > - >reduce- > < k3,v3 >(Output)。

Input

Output

Map

<k1, v1>

list (<k2, v2>)

Reduce

<k2, list(v2)>

list (<k3, v3>)

术语

  • PayLoad-实现Map和Reduce方法的应用程序,是任务的核心。
  • Mapper - Mapper将输入的键/值对映射为一组中间键/值对。
  • NamedNode -管理Hadoop分布式文件系统(HDFS)的节点。
  • DataNode—在进行任何处理之前提前展示数据的节点。
  • MasterNode—JobTracker运行的节点,并接受来自客户端的任务请求。
  • SlaveNode -Map和Reduce程序运行的节点。
  • JobTracker -调度任务并跟踪分配的任务到任务跟踪器。
  • Task Tracker-跟踪任务,向JobTracker报告任务状态。
  • Job—数据集上执行Mapper和Reducer的程序。
  • Task—在数据块的Mapper或Reducer任务。
  • Task Attempt—尝试在SlaveNode上执行任务的特定实例

示例场景

以下是某机构的用电量的数据。它包含了每月的用电量和几年的平均用电量。

Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec Avg

1979 23 23 2 43 24 25 26 26 26 26 25 26 25

1980 26 27 28 28 28 30 31 31 31 30 30 30 29

1981 31 32 32 32 33 34 35 36 36 34 34 34 34

1984 39 38 39 39 39 41 42 43 40 39 38 38 40

1985 38 39 39 39 39 41 41 41 00 40 39 39 45

如果将上述数据作为输入,我们必须编写应用程序来处理它并产生结果,例如查找最大用电量年、最小用电量年,等等。如果记录数量有限,对程序员来说,这是一个简单的过程。他们将简单地编写逻辑来生成所需的输出,并将数据传递给应用程序。

但是,如果数据展示的是一个特定州的所有大型工业的从开始到现在的电力消耗。

  • 当我们编写应用程序来处理这些批量数据时,程序需要大量的时间来执行。
  • 当我们将数据从源头转移到网络服务器,将会消耗大量的网络流量。

为了解决这些问题,我们需要MapReduce框架。

输入数据

以下数据保存在sample.txt,作为输入数据:

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45 

示例程序

以下的程序使用MapReduce框架处理样本数据

package hadoop;  import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path;import org.apache.hadoop.conf.*;import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.*;  public class ProcessUnits { 
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   { 
      
      //Map function 
   public void map(LongWritable key, Text value, 
   OutputCollector<Text, IntWritable> output,   
   Reporter reporter) throws IOException 
    { 
     String line = value.toString(); 
     String lasttoken = null; 
     StringTokenizer s = new StringTokenizer(line,"\t"); 
     String year = s.nextToken(); 
         
     while(s.hasMoreTokens())
        {
          lasttoken=s.nextToken();
        } 
            
      int avgprice = Integer.parseInt(lasttoken); 
      output.collect(new Text(year), new IntWritable(avgprice)); 
    } 
 } 
   
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements 
   Reducer< Text, IntWritable, Text, IntWritable > 
   {  
   
      //Reduce function 
    public void reduce( Text key, Iterator <IntWritable> values, 
    OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
     { 
       int maxavg=30; 
       int val=Integer.MIN_VALUE; 
            
       while (values.hasNext()) 
        { 
          if((val=values.next().get())>maxavg) 
           { 
            output.collect(key, new IntWritable(val)); 
           } 
         } 
 
      } 
   }  
   
   
   //Main function 
   public static void main(String args[])throws Exception 
   { 
    JobConf conf = new JobConf(ProcessUnits.class); 
      
    conf.setJobName("max_eletricityunits"); 
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class); 
    conf.setMapperClass(E_EMapper.class); 
    conf.setCombinerClass(E_EReduce.class); 
    conf.setReducerClass(E_EReduce.class); 
    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 
      
    FileInputFormat.setInputPaths(conf, new Path(args[0])); 
    FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
    JobClient.runJob(conf); 
   }  } 

将上面的程序保存为ProcessUnits.java。程序的编译和执行解释如下

编译和单元程序的执行过程

假设我们进入Hadoop用户的主目录中(例如,/home/Hadoop)。

按照下面的步骤编译并执行上面的程序。

步骤1

下面的命令是创建一个目录来存储已编译的java类。

$ mkdir units

步骤2

下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序。访问以下链接http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下载jar。假设下载的文件夹是/home/hadoop/。

步骤3

下面的命令用于编译ProcessUnits.java,为程序创建一个jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java

$ jar -cvf units.jar -C units/ .

步骤4

下面的命令用于在HDFS中创建一个输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

步骤5

下面的命令用于复制名为sample的输入文件。txtin是HDFS的输入目录。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

步骤6

下面的命令用于验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步骤7

下面的命令是用于从输入的目录获取输入文件,运行eleunit_max应用。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

等待一段时间,直到文件被执行。执行后,如下图所示,输出将包含输入细分的数目、Map任务的数量、reducer任务的数量等。

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
File System Counters 
FILE: Number of bytes read=61 
FILE: Number of bytes written=279400 
FILE: Number of read operations=0 
FILE: Number of large read operations=0   
FILE: Number of write operations=0 
HDFS: Number of bytes read=546 
HDFS: Number of bytes written=40 
HDFS: Number of read operations=9 
HDFS: Number of large read operations=0 
HDFS: Number of write operations=2 Job Counters 
   Launched map tasks=2  
   Launched reduce tasks=1 
   Data-local map tasks=2  
   Total time spent by all maps in occupied slots (ms)=146137 
   Total time spent by all reduces in occupied slots (ms)=441   
   Total time spent by all map tasks (ms)=14613 
   Total time spent by all reduce tasks (ms)=44120 
   Total vcore-seconds taken by all map tasks=146137 
   Total vcore-seconds taken by all reduce tasks=44120 
   Total megabyte-seconds taken by all map tasks=149644288 
   Total megabyte-seconds taken by all reduce tasks=45178880 
Map-Reduce Framework 
Map input records=5  
   Map output records=5   
   Map output bytes=45  
   Map output materialized bytes=67  
   Input split bytes=208 
   Combine input records=5  
   Combine output records=5 
   Reduce input groups=5  
   Reduce shuffle bytes=6  
   Reduce input records=5  
   Reduce output records=5  
   Spilled Records=10  
   Shuffled Maps =2  
   Failed Shuffles=0  
   Merged Map outputs=2  
   GC time elapsed (ms)=948  
   CPU time spent (ms)=5160  
   Physical memory (bytes) snapshot=47749120  
   Virtual memory (bytes) snapshot=2899349504  
   Total committed heap usage (bytes)=277684224
File Output Format Counters 
   Bytes Written=40 

步骤8

下面的命令用于验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

步骤9

下面的命令是用来看part- 00000文件输出。这个文件是由HDFS生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

下面是MapReduce程序生成的输出。

1981 34

1984 40

1985 45

步骤10

下面的命令用来从HDFS复制输出文件夹到本地文件系统,用作分析。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

重要命令

所有Hadoop命令都由$HADOOP_HOME/bin/hadoop命令调用。 运行Hadoop脚本不加任何参数会打印所有命令的描述。

Usage : hadoop [--config confdir] COMMAND

下表罗列了可用选项及其描述

Options

Description

namenode -format

Formats the DFS filesystem.

secondarynamenode

Runs the DFS secondary namenode.

namenode

Runs the DFS namenode.

datanode

Runs a DFS datanode.

dfsadmin

Runs a DFS admin client.

mradmin

Runs a Map-Reduce admin client.

fsck

Runs a DFS filesystem checking utility.

fs

Runs a generic filesystem user client.

balancer

Runs a cluster balancing utility.

oiv

Applies the offline fsimage viewer to an fsimage.

fetchdt

Fetches a delegation token from the NameNode.

jobtracker

Runs the MapReduce job Tracker node.

pipes

Runs a Pipes job.

tasktracker

Runs a MapReduce task Tracker node.

historyserver

Runs job history servers as a standalone daemon.

job

Manipulates the MapReduce jobs.

queue

Gets information regarding JobQueues.

version

Prints the version.

jar <jar>

Runs a jar file.

distcp <srcurl> <desturl>

Copies file or directories recursively.

distcp2 <srcurl> <desturl>

DistCp version 2.

archive -archiveName NAME -p

Creates a hadoop archive.

<parent path> <src>* <dest>

classpath

Prints the class path needed to get the Hadoop jar and the required libraries.

daemonlog

Get/Set the log level for each daemon

如何与mapreduce任务交互

以下是Hadoop任务中可用的通用选项。

GENERIC_OPTIONS

Description

-submit <job-file>

Submits the job.

-status <job-id>

Prints the map and reduce completion percentage and all job counters.

-counter <job-id> <group-name> <countername>

Prints the counter value.

-kill <job-id>

Kills the job.

-events <job-id> <fromevent-#> <#-of-events>

Prints the events' details received by jobtracker for the given range.

-history [all] <jobOutputDir> - history < jobOutputDir>

Prints job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed by specifying the [all] option.

-list[all]

Displays all jobs. -list displays only jobs which are yet to complete.

-kill-task <task-id>

Kills the task. Killed tasks are NOT counted against failed attempts.

-fail-task <task-id>

Fails the task. Failed tasks are counted against failed attempts.

-set-priority <job-id> <priority>

Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

查看job状态

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>  e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004 

查看job输出历史记录

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> e.g.
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output 

终止job

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004 

原文发布于微信公众号 - java达人(drjava)

原文发表时间:2017-07-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏coding...

Flutter 简易新闻项目目标效果对比简介代码代码地址

使用flutter快速开发 Android 和 iOS 的简易的新闻客户端 API使用的是 showapi(易源数据) 加载热门微信文章

20620
来自专栏wannshan(javaer,RPC)

ConcurrentHashMap 锁分段 源码分析

看ConcurrentHashMap下几个属性: /** * The default concurrency level for this table...

42260
来自专栏我是攻城师

ElasticSearch之Java Api聚合分组实战

48360
来自专栏xingoo, 一个梦想做发明家的程序员

Oozie调度报错——ORA-00918:未明确定义列

Oozie在执行sqoop的时候报错,同样的SQL在sqoop中可用,在oozie中不可用: Caused by: java.sql.SQLSyntaxErro...

247100
来自专栏数据之美

关于 xargs 参数被截断,tar 文件被覆盖的问题

问题: 目录下共 2W+ 个小文件: $ find . -type f | wc -l   20083   如果我们这样打包,会爆出 "Arg...

23860
来自专栏Jed的技术阶梯

利用Sqoop实现Hive的数据与MySQL数据的互导

注意: 在sqoop-1.4.6以前,从MySQL中导出数据到hive表中,不能指定文件格式为parquet,只能先导入到HDFS,在从HDFS上load p...

1.1K20
来自专栏linux驱动个人学习

高通Audio中ASOC的codec驱动(二)

继上一篇文章:高通Audio中ASOC的machine驱动(一) ASOC的出现是为了让codec独立于CPU,减少和CPU之间的耦合,这样同一个codec驱动...

1.1K60
来自专栏移动开发面面观

ProgressiveJpeg介绍与在Android中的使用

26840
来自专栏祝威廉

PySpark如何设置worker的python命令

因为最近在研究spark-deep-learning项目,所以重点补习了下之前PySpark相关的知识,跟着源码走了一遍。希望能够对本文的读者有所帮助。

10620
来自专栏Hadoop数据仓库

HAWQ技术解析(九) —— 外部数据

        HAWQ不但可以读写自身系统中的表,而且能够访问HDFS、Hive、HBase等外部系统的数据。这是通过一个名为PXF的扩展框架实现的。大部分外...

263100

扫码关注云+社区

领取腾讯云代金券