首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
Hadoop面试复习系列——HDFS(一)
2
大数据技术之_04_Hadoop学习_01_HDFS_HDFS概述+HDFS的Shell操作(开发重点)+HDFS客户端操作(开发重点)+HDFS的数据流(面试重点)+NameNode和Seconda
3
大数据技术之_05_Hadoop学习_02_MapReduce_MapReduce框架原理+InputFormat数据输入+MapReduce工作流程(面试重点)+Shuffle机制(面试重点)
4
大数据技术之_05_Hadoop学习_01_MapReduce_MapReduce概述+Hadoop序列化
5
大数据技术之_03_Hadoop学习_01_入门_大数据概论+从Hadoop框架讨论大数据生态+Hadoop运行环境搭建(开发重点)
6
大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客
7
大数据技术之_06_Zookeeper学习_Zookeeper入门+Zookeeper安装+Zookeeper内部原理+Zookeeper实战(开发重点)+企业面试真题
8
大数据技术之_09_Hive学习_复习与总结
9
大数据技术之_07_Hadoop学习_HDFS_HA(高可用)_HA概述+HDFS-HA工作机制+HDFS-HA集群配置+YARN-HA配置+HDFS Federation(联邦) 架构设计
10
大数据技术之_08_Hive学习_01_Hive入门+Hive安装、配置和使用+Hive数据类型
11
大数据技术之_08_Hive学习_04_压缩和存储(Hive高级)+ 企业级调优(Hive优化)
12
大数据技术之_08_Hive学习_05_Hive实战之谷粒影音(ETL+TopN)+常见错误及解决方案
13
大数据技术之_08_Hive学习_02_DDL数据定义(创建/查询/修改/删除数据库+创建表+分区表+修改表+删除表)+DML数据操作(数据导入+数据导出+清除表中数据)
14
大数据技术之_08_Hive学习_03_查询+函数
15
大数据技术之_16_Scala学习_09_函数式编程-高级
16
大数据技术之_09_Flume学习_Flume概述+Flume快速入门+Flume企业开发案例+Flume监控之Ganglia+Flume高级之自定义MySQLSource+Flume企业真实面试题(
17
大数据技术之_13_Azkaban学习_Azkaban(阿兹卡班)介绍 + Azkaban 安装部署 + Azkaban 实战
18
大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例
19
大数据技术之_12_Sqoop学习_Sqoop 简介+Sqoop 原理+Sqoop 安装+Sqoop 的简单使用案例+Sqoop 一些常用命令及参数
20
大数据技术之_16_Scala学习_13_Scala语言的数据结构和算法_Scala学习之旅收官之作
21
大数据技术之_19_Spark学习_06_Spark 源码解析 + Spark 通信架构、脚本解析、standalone 模式启动、提交流程 + Spark Shuffle 过程 + Spark 内存
22
大数据技术之_16_Scala学习_04_函数式编程-基础+面向对象编程-基础
23
大数据技术之_14_Oozie学习
24
大数据技术之_26_交通状态预测项目_01
25
大数据技术之_16_Scala学习_02_变量
26
大数据技术之_16_Scala学习_07_数据结构(上)-集合
27
大数据技术之_28_电商推荐系统项目_01
28
大数据技术之_28_电商推荐系统项目_02
29
大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址
30
大数据技术之_16_Scala学习_01_Scala 语言概述
31
大数据技术之_29_MySQL 高級面试重点串讲_02
32
大数据技术之_18_大数据离线平台_05_离线平台项目模块小结
33
大数据技术之_19_Spark学习_06_Spark 源码解析小结
34
大数据技术之_16_Scala学习_05_面向对象编程-中级
35
大数据技术之_16_Scala学习_08_数据结构(下)-集合操作+模式匹配
36
大数据技术之_24_电影推荐系统项目_05_项目系统设计
37
大数据技术之_19_Spark学习_03_Spark SQL 应用解析小结
38
大数据技术之_19_Spark学习_07_Spark 性能调优小结
39
大数据技术之_19_Spark学习_05_Spark GraphX 应用解析小结
40
大数据技术之_19_Spark学习_02_Spark Core 应用解析小结
41
大数据技术之_24_电影推荐系统项目_08_项目总结及补充
42
大数据技术之_19_Spark学习_01_Spark 基础解析小结(无图片)
43
大数据技术之_18_大数据离线平台_03_数据处理+工具代码导入+业务 ETL 实现+创建数据库表
44
大数据技术之_24_电影推荐系统项目_02_Python 基础语法复习
45
大数据技术之_24_电影推荐系统项目_06_项目体系架构设计 + 工具环境搭建 + 创建项目并初始化业务数据 + 离线推荐服务建设 + 实时推荐服务建设 + 基于内容的推荐服务建设
46
大数据技术之_27_电商平台数据分析项目_01_大数据的框架回顾 + 大数据的企业应用
47
大数据技术之_23_Python核心基础学习_03_函数 + 对象(12.5小时)
48
大数据技术之_32_大数据面试题_01_Hive 基本面试 + Hive 数据分析面试 + Flume + Kafka 面试
49
大数据技术之_23_Python核心基础学习_04_ 异常 + 文件(3.5小时)
50
大数据技术之_16_Scala学习_03_运算符+程序流程控制
清单首页hadoop文章详情

大数据技术之_05_Hadoop学习_01_MapReduce_MapReduce概述+Hadoop序列化

第1章 MapReduce概述

1.1 MapReduce定义

1.2 MapReduce优缺点

1.2.1 优点

1.2.2 缺点

1.3 MapReduce核心思想

MapReduce核心编程思想,如下图所示。

详解如下:

  • 1)分布式的运算程序往往需要分成至少2个阶段。
  • 2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
  • 3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
  • 4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

总结:分析WordCount数据流走向,深入理解MapReduce核心思想。

1.4 MapReduce进程

1.5 官方WordCount源码

  采用反编译工具【jd-gui.exe】反编译源码,发现WordCount案例有Map类、Reduce类和驱动类。且数据的类型是Hadoop自身封装的序列化类型。

1.6 常用数据序列化类型

常用的数据类型对应的Hadoop数据序列化类型

1.7 MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer 和 Driver。

1.8 WordCount案例实操

1、需求 在给定的文本文件中统计输出每一个单词出现的总次数 (1)输入数据 hello.txt

代码语言:javascript
复制
atguigu atguigu
ss ss
cls cls
jiao
banzhang
xue
hadoop

(2)期望输出数据

代码语言:javascript
复制
atguigu    2
banzhang    1
cls    2
hadoop    1
jiao    1
ss    2
xue    1

2、需求分析 按照MapReduce编程规范,分别编写Mapper,Reducer,Driver,如下图所示。

3、环境准备 (1)创建Maven工程

不使用骨架创建Maven工程

填写信息

(2)在pom.xml文件中添加如下依赖

代码语言:javascript
复制
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
    </dependencies>

(3)在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

代码语言:javascript
复制
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4.编写程序 (1)编写Mapper类

代码语言:javascript
复制
package com.atguigu.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Mapper阶段
 * 
 * @author bruce
 */
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1、获取一行
        String line = value.toString();

        // 2、按照空格切割
        String[] words = line.split(" ");

        // 3、循环输出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

(2)编写Reducer类

代码语言:javascript
复制
package com.atguigu.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Reducer阶段
 * 
 * @author bruce
 */
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    int sum;
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        // 1、加求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }

        // 2、输出
        v.set(sum);
        context.write(key, v);
    }
}

(3)编写Driver驱动类

代码语言:javascript
复制
package com.atguigu.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver {

    public static void main(String[] args)
            throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

        // 1、获取配置信息对象以及封装任务
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2、设置jar的加载路径
        job.setJarByClass(WordcountDriver.class);

        // 3、设置map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 4、设置map输出的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5、设置最终输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6、设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7、提交job
        // job.submit();
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

5、本地测试 (1)如果电脑系统是win7的就将win7的hadoop jar包解压到非中文路径,并在Windows环境上配置HADOOP_HOME环境变量。如果是电脑win10操作系统,就解压win10的hadoop jar包,并配置HADOOP_HOME环境变量。 注意:win8电脑和win10家庭版操作系统可能有问题,需要重新编译源码或者更改操作系统。

(2)在Eclipse/Idea上运行程序

控制台出现了如下相关异常:

代码语言:javascript
复制
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:364)

解决方案一:拷贝hadoop.dll文件(文件位置:D:\work\Hadoop\hadoop-2.7.2\bin)到Windows目录C:\Windows\System32。个别同学电脑可能还需要修改Hadoop源码。(方案一:亲测有效) 解决方案二:创建如下包名,并将NativeIO.java拷贝到该包名下

(3)Debug调试

6、在集群上测试 (0)用maven打jar包,需要添加打包插件依赖 注意:标记红颜色的部分需要替换为自己工程主类。

代码语言:javascript
复制
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin </artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.atguigu.mr.WordcountDriver</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

注意:如果工程上显示红叉。在项目上右键->Maven->Update Project即可。 (1)将程序打成jar包,然后拷贝到Hadoop集群中   步骤详情:右键->Run as->Maven install。等待编译完成就会在项目的target文件夹中生成jar包。如果看不到。在项目上右键->Refresh,即可看到。修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群。 (2)启动Hadoop集群

代码语言:javascript
复制
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh 
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh 

(3)执行WordCount程序

代码语言:javascript
复制
[atguigu@hadoop102 hadoop-2.7.2]$ hadoop jar wc.jar com.atguigu.mr.wordcount.WordcountDriver /user/atguigu/input/ /user/atguigu/output1/

第2章 Hadoop序列化

2.1 序列化概述

2.2 自定义bean对象实现序列化接口(Writable)

  在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。 具体实现bean对象序列化步骤如下7步。 (1)必须实现Writable接口。 (2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造。

代码语言:javascript
复制
public FlowBean() {
    super();
}

(3)重写序列化方法。

代码语言:javascript
复制
@Override
public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);
    out.writeLong(downFlow);
    out.writeLong(sumFlow);
}

(4)重写反序列化方法。

代码语言:javascript
复制
@Override
public void readFields(DataInput in) throws IOException {
    upFlow = in.readLong();
    downFlow = in.readLong();
    sumFlow = in.readLong();
}

(5)注意反序列化的顺序和序列化的顺序完全一致。 (6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。 (7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。

代码语言:javascript
复制
@Override
public int compareTo(FlowBean o) {
    // 倒序排列,从大到小
    return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

2.3 序列化案例实操

1、需求   统计每一个手机号耗费的总上行流量、下行流量、总流量。 (1)输入数据 phone_data.txt

代码语言:javascript
复制
1    13736230513 192.196.100.1   www.atguigu.com 2481    24681   200
2    13846544121 192.196.100.2                   264 0   200
3     13956435636 192.196.100.3                   132     1512    200
4     13966251146 192.168.100.1                   240 0   404
5     18271575951 192.168.100.2   www.atguigu.com 1527    2106    200
6     84188413    192.168.100.3   www.atguigu.com 4116    1432    200
7     13590439668 192.168.100.4                   1116    954     200
8     15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9     13729199489 192.168.100.6                   240 0   200
10     13630577991 192.168.100.7   www.shouhu.com  6960    690     200
11     15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12     15959002129 192.168.100.9   www.atguigu.com 1938    180     500
13     13560439638 192.168.100.10                  918     4938    200
14     13470253144 192.168.100.11                  180     180     200
15     13682846555 192.168.100.12  www.qq.com      1938    2910    200
16     13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17     13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18     18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19     13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20     13768778790 192.168.100.17                  120     120     200
21     13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22     13568436656 192.168.100.19                  1116    954     200

(2)输入数据格式

代码语言:javascript
复制
7     13560436666 120.196.100.99      1116         954            200
id    手机号码        网络ip          上行流量     下行流量     网络状态码

(3)期望输出数据格式

代码语言:javascript
复制
13560436666         1116                954             2070
手机号码            上行流量            下行流量         总流量

2、需求分析

3、编写MapReduce程序 (1)编写流量统计的Bean对象

代码语言:javascript
复制
package com.atguigu.mr.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

// 1、实现writable接口
public class FlowBean implements Writable {

    private long upFlow; // 上行流量
    private long downFlow;  // 下行流量
    private long sumFlow; // 总流量

    // 2 、反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    // 3、 写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    // 4、反序列化方法
    // 5、反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow  = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    // 6、重写toString方法,方便后续打印到文本
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}

(2)编写Mapper类

代码语言:javascript
复制
package com.atguigu.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    Text k = new Text();
    FlowBean v = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1、获取一行:7 13560436666     120.196.100.99      1116    954     200
        String line = value.toString();

        // 2、切隔字段
        String[] fieids = line.split("\t");

        // 3、封装对象
        // 取出手机号码
        String phoneNum = fieids[1]; // 封装手机号
        k.set(phoneNum);

        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fieids[fieids.length - 3]);
        long downFlow = Long.parseLong(fieids[fieids.length - 2]);

        v.setUpFlow(upFlow);
        v.setDownFlow(downFlow);
        // v.set(upFlow, downFlow);

        // 4、写出
        context.write(k, v);
    }
}

(3)编写Reducer类

代码语言:javascript
复制
package com.atguigu.mapreduce.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {

        long sum_upFlow = 0;
        long sum_downFlow = 0;

        // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            sum_upFlow += flowBean.getUpFlow();
            sum_downFlow += flowBean.getDownFlow();
        }

        // 2 封装对象
        FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);

        // 3 写出
        context.write(key, resultBean);
    }
}

(4)编写Driver驱动类

代码语言:javascript
复制
package com.atguigu.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "d:/temp/atguigu/0529/input/inputflow", "d:/temp/atguigu/0529/output2" };

        // 1、获取配置信息,或者获取job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2、指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);

        // 3、指定本业务job要使用的Mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 4、指定Mapper输出的数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5、指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6、指定job的输入输出原始文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7、将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}
下一篇
举报
领券