Hadoop: MapReduce2多个job串行处理

复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以上一篇中的求平均数为例,可以分解成三个步骤:

1. 求Sum

2. 求Count

3. 计算平均数

每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来

  1 package yjmyzz.mr.job.link;
  2 
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.fs.Path;
  5 import org.apache.hadoop.io.DoubleWritable;
  6 import org.apache.hadoop.io.LongWritable;
  7 import org.apache.hadoop.io.Text;
  8 import org.apache.hadoop.mapreduce.Job;
  9 import org.apache.hadoop.mapreduce.Mapper;
 10 import org.apache.hadoop.mapreduce.Reducer;
 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 13 import yjmyzz.util.HDFSUtil;
 14 
 15 import java.io.IOException;
 16 
 17 
 18 public class Avg2 {
 19 
 20     private static final Text TEXT_SUM = new Text("SUM");
 21     private static final Text TEXT_COUNT = new Text("COUNT");
 22     private static final Text TEXT_AVG = new Text("AVG");
 23 
 24     //计算Sum
 25     public static class SumMapper
 26             extends Mapper<LongWritable, Text, Text, LongWritable> {
 27 
 28         public long sum = 0;
 29 
 30         public void map(LongWritable key, Text value, Context context)
 31                 throws IOException, InterruptedException {
 32             sum += Long.parseLong(value.toString());
 33         }
 34 
 35         protected void cleanup(Context context) throws IOException, InterruptedException {
 36             context.write(TEXT_SUM, new LongWritable(sum));
 37         }
 38 
 39     }
 40 
 41     public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
 42 
 43         public long sum = 0;
 44 
 45         public void reduce(Text key, Iterable<LongWritable> values, Context context)
 46                 throws IOException, InterruptedException {
 47             for (LongWritable v : values) {
 48                 sum += v.get();
 49             }
 50             context.write(TEXT_SUM, new LongWritable(sum));
 51         }
 52 
 53     }
 54 
 55     //计算Count
 56     public static class CountMapper
 57             extends Mapper<LongWritable, Text, Text, LongWritable> {
 58 
 59         public long count = 0;
 60 
 61         public void map(LongWritable key, Text value, Context context)
 62                 throws IOException, InterruptedException {
 63             count += 1;
 64         }
 65 
 66         protected void cleanup(Context context) throws IOException, InterruptedException {
 67             context.write(TEXT_COUNT, new LongWritable(count));
 68         }
 69 
 70     }
 71 
 72     public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
 73 
 74         public long count = 0;
 75 
 76         public void reduce(Text key, Iterable<LongWritable> values, Context context)
 77                 throws IOException, InterruptedException {
 78             for (LongWritable v : values) {
 79                 count += v.get();
 80             }
 81             context.write(TEXT_COUNT, new LongWritable(count));
 82         }
 83 
 84     }
 85 
 86     //计算Avg
 87     public static class AvgMapper
 88             extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
 89 
 90         public long count = 0;
 91         public long sum = 0;
 92 
 93         public void map(LongWritable key, Text value, Context context)
 94                 throws IOException, InterruptedException {
 95             String[] v = value.toString().split("\t");
 96             if (v[0].equals("COUNT")) {
 97                 count = Long.parseLong(v[1]);
 98             } else if (v[0].equals("SUM")) {
 99                 sum = Long.parseLong(v[1]);
100             }
101         }
102 
103         protected void cleanup(Context context) throws IOException, InterruptedException {
104             context.write(new LongWritable(sum), new LongWritable(count));
105         }
106 
107     }
108 
109 
110     public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> {
111 
112         public long sum = 0;
113         public long count = 0;
114 
115         public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
116                 throws IOException, InterruptedException {
117             sum += key.get();
118             for (LongWritable v : values) {
119                 count += v.get();
120             }
121         }
122 
123         protected void cleanup(Context context) throws IOException, InterruptedException {
124             context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count));
125         }
126 
127     }
128 
129 
130     public static void main(String[] args) throws Exception {
131 
132         Configuration conf = new Configuration();
133 
134         String inputPath = "/input/duplicate.txt";
135         String maxOutputPath = "/output/max/";
136         String countOutputPath = "/output/count/";
137         String avgOutputPath = "/output/avg/";
138 
139         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
140         HDFSUtil.deleteFile(conf, maxOutputPath);
141         HDFSUtil.deleteFile(conf, countOutputPath);
142         HDFSUtil.deleteFile(conf, avgOutputPath);
143 
144         Job job1 = Job.getInstance(conf, "Sum");
145         job1.setJarByClass(Avg2.class);
146         job1.setMapperClass(SumMapper.class);
147         job1.setCombinerClass(SumReducer.class);
148         job1.setReducerClass(SumReducer.class);
149         job1.setOutputKeyClass(Text.class);
150         job1.setOutputValueClass(LongWritable.class);
151         FileInputFormat.addInputPath(job1, new Path(inputPath));
152         FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath));
153 
154 
155         Job job2 = Job.getInstance(conf, "Count");
156         job2.setJarByClass(Avg2.class);
157         job2.setMapperClass(CountMapper.class);
158         job2.setCombinerClass(CountReducer.class);
159         job2.setReducerClass(CountReducer.class);
160         job2.setOutputKeyClass(Text.class);
161         job2.setOutputValueClass(LongWritable.class);
162         FileInputFormat.addInputPath(job2, new Path(inputPath));
163         FileOutputFormat.setOutputPath(job2, new Path(countOutputPath));
164 
165 
166         Job job3 = Job.getInstance(conf, "Average");
167         job3.setJarByClass(Avg2.class);
168         job3.setMapperClass(AvgMapper.class);
169         job3.setReducerClass(AvgReducer.class);
170         job3.setMapOutputKeyClass(LongWritable.class);
171         job3.setMapOutputValueClass(LongWritable.class);
172         job3.setOutputKeyClass(Text.class);
173         job3.setOutputValueClass(DoubleWritable.class);
174 
175         //将job1及job2的输出为做job3的输入
176         FileInputFormat.addInputPath(job3, new Path(maxOutputPath));
177         FileInputFormat.addInputPath(job3, new Path(countOutputPath));
178         FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath));
179 
180         //提交job1及job2,并等待完成
181         if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) {
182             System.exit(job3.waitForCompletion(true) ? 0 : 1);
183         }
184 
185     }
186 
187 
188 }

输入文本在上一篇可以找到,上面这段代码的主要思路:

1. Sum和Count均采用相同的输入/input/duplicate.txt,然后将各自的处理结果分别输出到/output/max/及/output/count/下

2. Avg从/output/max及/output/count获取结果做为输入,然后根据Key值不同,拿到sum和count的值,最终计算并输出到/output/avg/下

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏LanceToBigData

Hadoop(十五)MapReduce程序实例

一、统计好友对数(去重) 1.1、数据准备 joe, jon joe , kia joe, bob joe ,ali kia, ...

32080
来自专栏草根专栏

用C# (.NET Core) 实现迭代器设计模式

本文的概念来自深入浅出设计模式一书 项目需求 有两个饭店合并了, 它们各自有自己的菜单. 饭店合并之后要保留这两份菜单. 这两个菜单是这样的: ? 菜单项Men...

37950
来自专栏JAVA技术站

Hadoop的mapreduce的简单用法 原

  Mapreduce是一个计算框架,既然是做计算的框架,那么表现形式就是有个输入(input),mapreduce操作这个输入(input),通过本身定义好的...

38520
来自专栏码匠的流水账

聊聊flink的InputFormatSourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/envi...

22010
来自专栏一个会写诗的程序员的博客

Functors, Applicatives, And Monads In PicturesFunctors, Applicatives, And Monads In Pictures

原文: http://adit.io/posts/2013-04-17-functors,_applicatives,_and_monads_in_pictu...

16540
来自专栏函数式编程语言及工具

泛函编程(31)-泛函IO:Free Monad-Running free

  在上节我们介绍了Free Monad的基本情况。可以说Free Monad又是一个以数据结构替换程序堆栈的实例。实际上Free Monad的功能绝对不止...

258100
来自专栏PPV课数据科学社区

【学习】七天搞定SAS(二):基本操作(判断、运算、基本函数)

SAS生成新变量 SAS支持基本的加减乘除,值得一提的是它的**代表指数,而不是^。 * Modify homegarden data set with ass...

55240
来自专栏码匠的流水账

聊聊flink的TextOutputFormat

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/data...

22920
来自专栏积累沉淀

自定义分区、数据类型、排序、分组

自定义分区、数据类型、排序、分组 /** * * @author 自定义数据类型 键对象 * */ public class KeyPair imp...

22090
来自专栏Hongten

java的poi技术读取Excel数据到MySQL

这篇blog是介绍java中的poi技术读取Excel数据,然后保存到MySQL数据中。

27030

扫码关注云+社区

领取腾讯云代金券