hadoop系列之MR的经典代码案例一

七、MapReduce经典案例

1、网站分析案例 1)分析

省份访问

procinceId  --> Key
1                  -->Value
<procinceId,list(1,1,1,1,1,)>

数据库:

维度表

tb_provinve_info

provinveId

provinveName

provinveXxx

江苏省 -> 2098

上海市 -> 34563

2)程序

i.设置Mapper类和Map方法

ii.设置Reduce类和reduce方法

iii.设置run方法

iv.设置main方法

v.设置计数器(设置在mapper类中)

3)导出jar包运行

i.eclipse打包

iv.YARN命令运行

$ bin/yarn jar .....

2、二次排序

http://blog.csdn.net/u014729236/article/details/46327335

1)IntPair类

package com.hadoop.mr.sort;


import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;


import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.WritableComparable;


public class IntPair implements WritableComparable<IntPair> {

    private IntWritable first;

    private IntWritable second;


    public void set(IntWritable first, IntWritable second) {

        this.first = first;

        this.second = second;

    }

    //注意:需要添加无参的构造方法,否则反射时会报错。

    public IntPair() {

        set(new IntWritable(), new IntWritable());

    }

    public IntPair(int first, int second) {

        set(new IntWritable(first), new IntWritable(second));

    }


    public IntPair(IntWritable first, IntWritable second) {

        set(first, second);

    }


    public IntWritable getFirst() {

        return first;

    }


    public void setFirst(IntWritable first) {

        this.first = first;

    }


    public IntWritable getSecond() {

        return second;

    }


    public void setSecond(IntWritable second) {

        this.second = second;

    }


    @Override

    public void write(DataOutput out) throws IOException {

        first.write(out);

        second.write(out);

    }


    @Override

    public void readFields(DataInput in) throws IOException {

        first.readFields(in);

        second.readFields(in);

    }


    @Override

    public int hashCode() {

        return first.hashCode() * 163 + second.hashCode();

    }


    @Override

    public boolean equals(Object o) {

        if (o instanceof IntPair) {

            IntPair tp = (IntPair) o;

            return first.equals(tp.first) && second.equals(tp.second);

        }

        return false;

    }


    @Override

    public String toString() {

        return first + "\t" + second;

    }


    @Override

    public int compareTo(IntPair tp) {

        int cmp = first.compareTo(tp.first);

        if (cmp != 0) {

            return cmp;

        }

        return second.compareTo(tp.second);

    }

}

2)Secondary类

package com.hadoop.mr.sort;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class SecondarySort {

    static class TheMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {

        @Override

        protected void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] fields = value.toString().split("\t");

            int field1 = Integer.parseInt(fields[0]);

            int field2 = Integer.parseInt(fields[1]); 



            context.write(new IntPair(field1,field2), NullWritable.get());

        }

    }

    

    static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, NullWritable> {

        //private static final Text SEPARATOR = new Text("------------------------------------------------");

        @Override

        protected void reduce(IntPair key, Iterable<NullWritable> values, Context context)

                throws IOException, InterruptedException {

            context.write(key, NullWritable.get());

        }

    }


    public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {


        @Override

        public int getPartition(IntPair key, NullWritable value, int numPartitions) {

            return Math.abs(key.getFirst().get()) % numPartitions;

        }

        

    }

    

    //如果不添加这个类,默认第一列和第二列都是升序排序的。这个类的作用是使第一列升序排序,第二列降序排序

    public static class KeyComparator extends WritableComparator {

        //无参构造器必须加上,否则报错。

        protected KeyComparator() {

            super(IntPair.class, true);

        }

        @Override

        public int compare(WritableComparable a, WritableComparable b) {

            IntPair ip1 = (IntPair) a;

            IntPair ip2 = (IntPair) b;

            //第一列按升序排序

            int cmp = ip1.getFirst().compareTo(ip2.getFirst());

            if (cmp != 0) {

                return cmp;

            }

            //在第一列相等的情况下,第二列按倒序排序

            return -ip1.getSecond().compareTo(ip2.getSecond());

        }

    }

    

/*  public static class GroupComparator extends WritableComparator {

        //无参构造器必须加上,否则报错。

        protected GroupComparator() {

            super(IntPair.class, true);

        }

        @Override

        public int compare(WritableComparable a, WritableComparable b) {

            IntPair ip1 = (IntPair) a;

            IntPair ip2 = (IntPair) b;

            return ip1.getFirst().compareTo(ip2.getFirst());

        }

    }*/

    

    //入口程序

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(SecondarySort.class);

        //设置Mapper的相关属性

        job.setMapperClass(TheMapper.class);

        //当Mapper中的输出的key和value的类型和Reduce输出的key和value的类型相同时,以下两句可以省略。

        //job.setMapOutputKeyClass(IntPair.class);

        //job.setMapOutputValueClass(NullWritable.class);

    

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        

        //设置分区的相关属性

        job.setPartitionerClass(FirstPartitioner.class);

        //在map中对key进行排序

        job.setSortComparatorClass(KeyComparator.class);

        //job.setGroupingComparatorClass(GroupComparator.class);



        //设置Reducer的相关属性

        job.setReducerClass(TheReducer.class);

        job.setOutputKeyClass(IntPair.class);

        job.setOutputValueClass(NullWritable.class);



        FileOutputFormat.setOutputPath(job, new Path(args[1]));



        //设置Reducer数量

        int reduceNum = 1;

        if(args.length >= 3 && args[2] != null){

            reduceNum = Integer.parseInt(args[2]);

        }

        job.setNumReduceTasks(reduceNum);

        job.waitForCompletion(true);

    }

    

}

3)测试

打成secsort.jar包,从hdfs上的/test/secsortdata获取数据文件,mapreduce输出目录是/test/secsortresult8,启动1个reduce:

hadoop jar secsort.jar /test/secsortdata /test/secsortresult8 1

测试结果:

3、二次排序(写法二)

1)IntPair类

package com.hadoop.mr.sort;


import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;


public class IntPair implements WritableComparable<IntPair> {

    private int first = 0;

    private int second = 0;


    public void set(int first, int second) {

        this.first = first;

        this.second = second;

    }


    // 注意:需要添加无参的构造方法,否则反射时会报错。

    public IntPair() {


    }


    public IntPair(int first, int second) {

        set(first, second);

    }


    public int getFirst() {

        return first;

    }


    public void setFirst(int first) {

        this.first = first;

    }


    public int getSecond() {

        return second;

    }


    public void setSecond(int second) {

        this.second = second;

    }


    @Override

    public void write(DataOutput out) throws IOException {

        out.write(first);

        out.write(second);

    }


    @Override

    public void readFields(DataInput in) throws IOException {

        first = in.readInt();

        second = in.readInt();

    }


    @Override

    public int hashCode() {

        return first + "".hashCode() + second + "".hashCode();

    }


    @Override

    public boolean equals(Object right) {

        if (right instanceof IntPair) {

            IntPair r = (IntPair) right;

            return r.getFirst() == first && r.getSecond() == second;

        } else {

            return false;

        }

    }


    // 这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法

    @Override

    public int compareTo(IntPair o) {

        if (first != o.getFirst()) {

            return first - o.getFirst();

        } else if (second != o.getSecond()) {

            return o.getSecond() - second;

        } else {

            return 0;

        }

    }

}

2)Secondary类

package com.hadoop.mr.sort;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class SecondarySort {

    static class TheMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {

        @Override

        protected void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            String[] fields = value.toString().split("\t");

            int field1 = Integer.parseInt(fields[0]);

            int field2 = Integer.parseInt(fields[1]); 

            context.write(new IntPair(field1,field2), NullWritable.get());

        }

    }

    

    static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, NullWritable> {

        //private static final Text SEPARATOR = new Text("------------------------------------------------");

        @Override

        protected void reduce(IntPair key, Iterable<NullWritable> values, Context context)

                throws IOException, InterruptedException {

            context.write(key, NullWritable.get());

        }

    }


    public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {


        @Override

        public int getPartition(IntPair key, NullWritable value,

                int numPartitions) {

            return Math.abs(key.getFirst().get()) % numPartitions;

        }

        

    }

    

    //如果不添加这个类,默认第一列和第二列都是升序排序的。这个类的作用是使第一列升序排序,第二列降序排序

    public static class KeyComparator extends WritableComparator {

        //无参构造器必须加上,否则报错。

        protected KeyComparator() {

            super(IntPair.class, true);

        }

        @Override

        public int compare(WritableComparable a, WritableComparable b) {

            IntPair ip1 = (IntPair) a;

            IntPair ip2 = (IntPair) b;

            //第一列按升序排序

            int cmp = ip1.getFirst().compareTo(ip2.getFirst());

            if (cmp != 0) {

                return cmp;

            }

            //在第一列相等的情况下,第二列按倒序排序

            return -ip1.getSecond().compareTo(ip2.getSecond());

        }

    }

    

/*  public static class GroupComparator extends WritableComparator {

        //无参构造器必须加上,否则报错。

        protected GroupComparator() {

            super(IntPair.class, true);

        }

        @Override

        public int compare(WritableComparable a, WritableComparable b) {

            IntPair ip1 = (IntPair) a;

            IntPair ip2 = (IntPair) b;

            return ip1.getFirst().compareTo(ip2.getFirst());

        }

    }*/

    

    //入口程序

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(SecondarySort.class);

        //设置Mapper的相关属性

        job.setMapperClass(TheMapper.class);

        //当Mapper中的输出的key和value的类型和Reduce输出的key和value的类型相同时,以下两句可以省略。

        //job.setMapOutputKeyClass(IntPair.class);

        //job.setMapOutputValueClass(NullWritable.class);

    

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        

        //设置分区的相关属性

        job.setPartitionerClass(FirstPartitioner.class);

        //在map中对key进行排序

        job.setSortComparatorClass(KeyComparator.class);

        //job.setGroupingComparatorClass(GroupComparator.class);



        //设置Reducer的相关属性

        job.setReducerClass(TheReducer.class);

        job.setOutputKeyClass(IntPair.class);

        job.setOutputValueClass(NullWritable.class);



        FileOutputFormat.setOutputPath(job, new Path(args[1]));



        //设置Reducer数量

        int reduceNum = 1;

        if(args.length >= 3 && args[2] != null){

            reduceNum = Integer.parseInt(args[2]);

        }

        job.setNumReduceTasks(reduceNum);

        job.waitForCompletion(true);

    }

    

}

PS#Scala二次排序

package com.spark.secondApp

import org.apache.spark.{SparkContext, SparkConf}


object SecondarySort {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName(" Secondary Sort ").setMaster("local")

    val sc = new SparkContext(conf)

    val file = sc.textFile("hdfs://worker02:9000/test/secsortdata")



    val rdd = file.map(line => line.split("\t")).

      map(x => (x(0),x(1))).groupByKey().

      sortByKey(true).map(x => (x._1,x._2.toList.sortWith(_>_)))



    val rdd2 = rdd.flatMap{

      x =>

      val len = x._2.length

      val array = new Array[(String,String)](len)

      for(i <- 0 until len) {

        array(i) = (x._1,x._2(i))

      }

      array  

    }



    sc.stop()

  }

}

承接子推荐阅读:

1,hadoop系列之基础系列

2,hadoop系列之深入优化

后续会讲MR join的经典案例。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2017-10-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java架构师

Stream篇(3)【StreamReader】

说明:实现一个TextReader,是其子类。父类是Text读取器,子类是流读取器 一、构造函数: 1、StreamReader(Stream stream)s...

32560
来自专栏GreenLeaves

Oracle常用函数

Create Table Test6( id varchar2(30), name varchar2(30), age...

20890
来自专栏Android Note

Android—Room数据库多表查询(Relationships)

44820
来自专栏JetpropelledSnake

Django学习笔记之Models与ORM操作

12160
来自专栏JavaEE

mybatis的association以及collection的用法association:一对一关联(has one)collection:一对多关联(has many)

63680
来自专栏JavaEE

mybatis笔记整理mybatis的基本用法及配置:

403110
来自专栏Java成神之路

Oracle学习笔记_03_单行函数

单行函数:        操作数据对象        接受参数返回一个结果 只对一行进行变换  每行返回一个结果        可以转换数据类型      ...

9230
来自专栏Leetcode名企之路

【Leetcode】177. 第N高的薪水

编写一个 SQL 查询,获取 Employee 表中第 n 高的薪水(Salary)。

24920
来自专栏Java学习之路

Hibernate学习---单表查询

我们都知道SQL是非常强大的,为什么这么说呢?相信学过数据库原理的同学们都深有体会,SQL语句变化无穷,好毫不夸张的说可以实现任意符合我们需要的数据库操作,既然...

30870
来自专栏Python爬虫与算法进阶

Spark实战--学习UDF

UDF全称User-Defined Functions,用户自定义函数,是Spark SQL的一项功能,用于定义新的基于列的函数,这些函数扩展了Spark SQ...

18210

扫码关注云+社区

领取腾讯云代金券