前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hadoop 利用ToolRunner运行MapReduce

Hadoop 利用ToolRunner运行MapReduce

作者头像
smartsi
发布2019-08-07 09:53:05
8060
发布2019-08-07 09:53:05
举报
文章被收录于专栏:SmartSi

大多数人通常使用通过静态 main 方法执行驱动程序代码创建他们的 MapReduce 作业。这种实现的缺点是大多数特定的配置(如果有的话)通常都是硬编码的(例如:设置Reducer的个数)。如果需要随时修改一些配置属性(例如:修改Reducer数量),就必须修改代码,然后重新构建你的jar文件并重新部署应用程序。这种方式很浪费时间。这可以通过在 MapReduce 驱动程序代码中实现 Tool 接口来避免。

1. Hadoop配置

通过实现 Tool 接口并扩展 Configured 类,你可以通过 GenericOptionsParser 轻松的在命令行界面设置 hadoop 配置对象。这使得你的代码更加具有可移植性(并且更加简洁),因为你不需要再对任何特定配置进行硬编码。

让我们举几个例子,使用和不使用Tool接口。

1.1 不使用 Tool 接口
代码语言:javascript
复制
package com.sjf.open.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;


/**
 * WordCount示例
 * @author sjf0115
 * @Date Created in 上午10:01 18-6-1
 */
public class WordCountNoTool{

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("./xxxx <input> <output> but got " + args.length + ":");
            for(String argument : args){
                System.out.println("Output: " + argument);
            }
            System.exit(1);
        }
        String inputPaths = args[0];
        String outputPath = args[1];

        Configuration conf = new Configuration();
        conf.set("mapred.job.queue.name", "xxx");

        Job job = Job.getInstance(conf);
        job.setJobName("word_count_example");
        job.setJarByClass(WordCountNoTool.class);

        // mapper
        job.setMapperClass(mapper.class);

        // reducer
        job.setReducerClass(reducer.class);
        job.setNumReduceTasks(2);

        // input
        FileInputFormat.setInputPaths(job, inputPaths);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // output
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split("\\s+");
            Text text = new Text();
            IntWritable intWritable = new IntWritable();
            for(String word : words){
                text.set(word);
                intWritable.set(1);
                // <word, 1>
                context.write(text, intWritable);
            }
        }
    }

    public static class reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for(IntWritable intWritable : values){
                count += intWritable.get();
            }
            context.write(key, new IntWritable(count));
        }
    }

    public static void main(String[] args) throws Exception {
        WordCountNoTool wordCountNoTool = new WordCountNoTool();
        int result = wordCountNoTool.run(args);
        System.exit(result);
    }
}

如下方式执行 MapReduce 作业。你期望在这里只有2个参数 inputPath 和 outputPath,可以通过 main方法String数组上的索引[0]和[1]获取:

代码语言:javascript
复制
hadoop jar common-tool-jar-with-dependencies.jar com.sjf.open.example.WordCountNoTool ${inputPath} ${outputPath}

在这种情况下,reducer 的个数硬编码在代码中(job.setNumReduceTasks(2)),因此无法根据需要进行修改。

1.2 使用 Tool 接口
代码语言:javascript
复制
package com.sjf.open.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;


/**
 * WordCount示例
 * @author sjf0115
 * @Date Created in 上午10:01 18-6-1
 */
public class WordCountWithTool extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("./xxxx <input> <output>");
            System.exit(1);
        }
        String inputPaths = args[0];
        String outputPath = args[1];

        Configuration conf = this.getConf();
        conf.set("mapred.job.queue.name", "xxxx");

        Job job = Job.getInstance(conf);
        job.setJobName("word_count_example");
        job.setJarByClass(WordCountWithTool.class);

        // mapper
        job.setMapperClass(mapper.class);

        // reducer
        job.setReducerClass(reducer.class);

        // input
        FileInputFormat.setInputPaths(job, inputPaths);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // output
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split("\\s+");
            Text text = new Text();
            IntWritable intWritable = new IntWritable();
            for(String word : words){
                text.set(word);
                intWritable.set(1);
                // <word, 1>
                context.write(text, intWritable);
            }
        }
    }

    public static class reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for(IntWritable intWritable : values){
                count += intWritable.get();
            }
            context.write(key, new IntWritable(count));
        }
    }

    public static void main(String[] args) throws Exception {
        int result = ToolRunner.run(new Configuration(), new WordCountWithTool(), args);
        System.exit(result);
    }
}

ToolsRunner 通过其静态 run 方法执行 MapReduce 作业。在这个例子中,我们不需要对 reducer的个数进行硬编码,因为它可以直接可以在命令行中指定(使用-D选项):

代码语言:javascript
复制
hadoop jar common-tool-jar-with-dependencies.jar com.sjf.open.example.WordCountWithTool -D mapred.reduce.tasks=1 ${inputPath} ${outputPath}

请注意,你仍然需要提供 inputPath 和 outputPath 两个参数。GenericOptionParser 可以把通用 Tools 选项与实际作业的参数分开。无论你提供多少个通用选项,inputPath 和 outputPath 变量仍位于索引[0]和[1]处,但位于 run 方法String数组中(不是在 main 方法String数组中)。

如果不实现 Tool 接口运行 MapReduce 作业: hadoop jar common-tool-jar-with-dependencies.jar com.sjf.open.example.WordCountNoTool -D mapred.reduce.tasks=1 ${inputPath} ${outputPath}

-D mapred.reduce.tasks=1 也会被 main 方法认为是一个参数,位于索引[0]处,inputPath 和 outputPath 则分别位于索引[1]和[2]处。

2. 支持通用选项

可以从CLI提供一些其他有用的选项。

代码语言:javascript
复制
-conf 指定应用程序配置文件
-D 给指定属性设置值
-fs 指定 namenode
-files 指定复制到集群上的文件,以逗号分隔
-libjars 指定包含在类路径中的jar文件,以逗号分隔

原文:https://hadoopi.wordpress.com/2013/06/05/hadoop-implementing-the-tool-interface-for-mapreduce-driver/

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-06-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. Hadoop配置
    • 1.1 不使用 Tool 接口
      • 1.2 使用 Tool 接口
      • 2. 支持通用选项
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档