前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Hadoop MapReduce 二次排序原理及其应用

Hadoop MapReduce 二次排序原理及其应用

作者头像
用户1177713
发布于 2018-02-24 06:53:10
发布于 2018-02-24 06:53:10
1.2K00
代码可运行
举报
文章被收录于专栏:数据之美数据之美
运行总次数:0
代码可运行

关于二次排序主要涉及到这么几个东西:

在0.20.0 以前使用的是

setPartitionerClass

setOutputkeyComparatorClass

setOutputValueGroupingComparator

 在0.20.0以后使用是

job.setPartitionerClass(Partitioner p);

job.setSortComparatorClass(RawComparator c);

job.setGroupingComparatorClass(RawComparator c);

下面的例子里面只用到了 setGroupingComparatorClass

http://blog.csdn.net/heyutao007/article/details/5890103 

mr自带的例子中的源码SecondarySort,我重新写了一下,基本没变。  这个例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程)  public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>  public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable> 

1、首先说一下工作原理: 

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的字节偏移量作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。 如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。 在第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。  在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。 

2、二次排序

就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序 的结果 。例如 :

echo "3 b 1 c 2 a 1 d 3 a"|sort -k1 -k2 1 c 1 d 2 a 3 a 3 b

3、具体步骤: 

1 自定义key。 

在mr中,所有的key是需要被比较和排序的,并且是二次,先根据partitione,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。  所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法  //反序列化,从流中的二进制转换成IntPair  public void readFields(DataInput in) throws IOException  //序列化,将IntPair转化成使用流传送的二进制  public void write(DataOutput out)  //key的比较  public int compareTo(IntPair o)  另外新定义的类应该重写的两个方法  //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)  public int hashCode()  public boolean equals(Object right) 

2 由于key是自定义的,所以还需要自定义一下类: 

2.1 分区函数类。这是key的第一次比较。 

public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>  在job中设置使用setPartitionerClasss  2.2 key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator。  public static class KeyComparator extends WritableComparator  必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)  另一种方法是 实现接口RawComparator。  在job中设置使用setSortComparatorClass。  2.3 分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。  public static class GroupingComparator extends WritableComparator  同key比较函数类,必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)  同key比较函数类,分组函数类另一种方法是实现接口RawComparator。  在job中设置使用setGroupingComparatorClass。  另外注意的是,如果reduce的输入与输出不是同一种类型,则不要定义Combiner也使用reduce,因为Combiner的输出是reduce的输入。除非重新定义一个Combiner。 

4 代码:

这个例子中没有使用key比较函数类,而是使用key的实现的compareTo方法:

代码语言:javascript
代码运行次数:0
运行
复制
package SecondarySort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SecondarySort
{
    //自己定义的key类应该实现WritableComparable接口
    public static class IntPair implements WritableComparable<IntPair>
    {
        String first;
        String second;
        /**
         * Set the left and right values.
         */
        public void set(String left, String right)
        {
            first = left;
            second = right;
        }
        public String getFirst()
        {
            return first;
        }
        public String getSecond()
        {
            return second;
        }
        //反序列化,从流中的二进制转换成IntPair
        public void readFields(DataInput in) throws IOException
        {
            first = in.readUTF();
            second = in.readUTF();
        }
        //序列化,将IntPair转化成使用流传送的二进制
        public void write(DataOutput out) throws IOException
        {
            out.writeUTF(first);
            out.writeUTF(second);
        }
        //重载 compareTo 方法,进行组合键 key 的比较,该过程是默认行为。
        //分组后的二次排序会隐式调用该方法。
        public int compareTo(IntPair o)
        {
            if (!first.equals(o.first) )
            {
                return first.compareTo(o.first);
            }
            else if (!second.equals(o.second))
            {
                return second.compareTo(o.second);
            }
            else
            {
                return 0;
            }
        }

        //新定义类应该重写的两个方法
        //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
        public int hashCode()
        {
            return first.hashCode() * 157 + second.hashCode();
        }
        public boolean equals(Object right)
        {
            if (right == null)
                return false;
            if (this == right)
                return true;
            if (right instanceof IntPair)
            {
                IntPair r = (IntPair) right;
                return r.first.equals(first) && r.second.equals(second) ;
            }
            else
            {
                return false;
            }
        }
    }
    /**
      * 分区函数类。根据first确定Partition。
      */
    public static class FirstPartitioner extends Partitioner<IntPair, Text>
    {
        public int getPartition(IntPair key, Text value,int numPartitions)
        {
            return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
        }
    }

    /**
     * 分组函数类。只要first相同就属于同一个组。
     */
    /*//第一种方法,实现接口RawComparator
    public static class GroupingComparator implements RawComparator<IntPair> {
        public int compare(IntPair o1, IntPair o2) {
            int l = o1.getFirst();
            int r = o2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
        }
        //一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
             return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
                     b2, s2, Integer.SIZE/8);
        }
    }*/
    //第二种方法,继承WritableComparator
    public static class GroupingComparator extends WritableComparator
    {
        protected GroupingComparator()
        {
            super(IntPair.class, true);
        }
        //Compare two WritableComparables.
        //  重载 compare:对组合键按第一个自然键排序分组
        public int compare(WritableComparable w1, WritableComparable w2)
        {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            String l = ip1.getFirst();
            String r = ip2.getFirst();
            return l.compareTo(r);
        }
    }


    // 自定义map
    public static class Map extends Mapper<LongWritable, Text, IntPair, Text>
    {
        private final IntPair keyPair = new IntPair();
        String[] lineArr = null;
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            String line = value.toString();
            lineArr = line.split("\t", -1);
            keyPair.set(lineArr[0], lineArr[1]);
            context.write(keyPair, value);
        }
    }
    // 自定义reduce
    //
    public static class Reduce extends Reducer<IntPair, Text, Text, Text>
    {
        private static final Text SEPARATOR = new Text("------------------------------------------------");
        
        public void reduce(IntPair key, Iterable<Text> values,Context context) throws IOException, InterruptedException
        {
            context.write(SEPARATOR, null);
            for (Text val : values)
            {
                context.write(null, val);
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
    {
        // 读取hadoop配置
        Configuration conf = new Configuration();
        // 实例化一道作业
        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(SecondarySort.class);
        // Mapper类型
        job.setMapperClass(Map.class);
        // 不再需要Combiner类型,因为Combiner的输出类型<Text, IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用
        //job.setCombinerClass(Reduce.class);
        // Reducer类型
        job.setReducerClass(Reduce.class);
        // 分区函数
        job.setPartitionerClass(FirstPartitioner.class);
        // 分组函数
        job.setGroupingComparatorClass(GroupingComparator.class);

        // map 输出Key的类型
        job.setMapOutputKeyClass(IntPair.class);
        // map输出Value的类型
        job.setMapOutputValueClass(Text.class);
        // rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat
        job.setOutputKeyClass(Text.class);
        // rduce输出Value的类型
        job.setOutputValueClass(Text.class);

        // 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。
        job.setInputFormatClass(TextInputFormat.class);
        // 提供一个RecordWriter的实现,负责数据输出。
        job.setOutputFormatClass(TextOutputFormat.class);

        // 输入hdfs路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 输出hdfs路径
        FileSystem.get(conf).delete(new Path(args[1]), true);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 提交job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

5 测试需求:

假如我们现在的需求是先按 cookieId 排序,然后按 time 排序,以便按 session 切分日志

6 测试数据与结果:

代码语言:javascript
代码运行次数:0
运行
复制
cookieId	time	url
2	12:12:34	2_hao123
3	09:10:34	3_baidu
1	15:02:41	1_google
3	22:11:34	3_sougou
1	19:10:34	1_baidu
2	15:02:41	2_google
1	12:12:34	1_hao123
3	23:10:34	3_soso
2	05:02:41	2_google

结果:
------------------------------------------------
1       12:12:34        1_hao123
1       15:02:41        1_google
1       19:10:34        1_baidu
------------------------------------------------
2       05:02:41        2_google
2       12:12:34        2_hao123
2       15:02:41        2_google
------------------------------------------------
3       09:10:34        3_baidu
3       22:11:34        3_sougou
3       23:10:34        3_soso

7 原理图(点击查看大图):

8、推荐阅读:

hive中使用标准sql实现分组内排序

http://superlxw1234.iteye.com/blog/1869612

Pig、Hive、MapReduce 解决分组 Top K 问题

http://my.oschina.net/leejun2005/blog/85187

9、REF:

mapreduce的二次排序 SecondarySort

http://blog.csdn.net/zyj8170/article/details/7530728

学会定制MapReduce里的partition,sort和grouping,Secondary Sort Made Easy 进行二次排序

http://blog.sina.com.cn/s/blog_9bf980ad0100zk7r.html

Simple Moving Average, Secondary Sort, and MapReduce (Part 3)

http://blog.cloudera.com/blog/2011/04/simple-moving-average-secondary-sort-and-mapreduce-part-3/

https://github.com/jpatanooga/Caduceus/tree/master/src/tv/floe/caduceus/hadoop/movingaverage

MapReduce的排序和二次排序原理总结

http://hugh-wangp.iteye.com/blog/1491175

泛型value的二次排序 

http://wenku.baidu.com/view/a3826a235901020207409c47.html

http://vangjee.wordpress.com/2012/03/20/secondary-sorting-aka-sorting-values-in-hadoops-mapreduce-programming-paradigm/

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
【Python篇】PyQt5 超详细教程——由入门到精通(终篇)
在 PyQt5 中,菜单栏(QMenuBar)、工具栏(QToolBar)和状态栏(QStatusBar)是 QMainWindow 提供的标准控件,用于帮助用户更好地与应用程序交互。它们是桌面应用程序的常见组成部分:
半截诗
2024/10/09
2.2K0
【Python篇】PyQt5 超详细教程——由入门到精通(终篇)
【Python篇】PyQt5 超详细教程——由入门到精通(中篇二)
【Python篇】PyQt5 超详细教程——由入门到精通(序篇) 【Python篇】PyQt5 超详细教程——由入门到精通(中篇一)
半截诗
2024/10/09
1.3K0
【Python篇】PyQt5 超详细教程——由入门到精通(中篇二)
Qt入门系列(二)
选中mainWindow.cpp右键->在Explor中显示->粘贴进来(显示包含的目录)
用户9831583
2022/06/16
2K0
Qt入门系列(二)
PyQt5案例汇总(完整版)
PyQt5是一套绑定Qt5的应用程序框架。他在Python 2.x和3.x中都是可用的。该教程使用的是Python3.x。
victorfengming
2021/11/26
5.2K0
【Python篇】PyQt5 超详细教程——由入门到精通(序篇)
PyQt5 是 Python 的图形用户界面 (GUI) 框架,它基于强大的 Qt 库。Qt 是一个跨平台的 C++ 框架,用于构建桌面应用程序。通过 PyQt5,我们可以用 Python 轻松构建跨平台的桌面应用程序,支持 Windows、macOS 和 Linux。
半截诗
2024/10/09
17.9K0
【Python篇】PyQt5 超详细教程——由入门到精通(序篇)
【QT】Qt 窗口 (QMainWindow)
QMainWindow 是一个为用户提供主窗口程序的类,继承自 QWidget 类,并且提供了一个预定义的布局。QMainWindow 包含一个菜单栏(menu bar)、多个工具栏(tool bars)、多个浮动窗口(铆接部件)(dock widgets)、⼀个状态栏(status bar) 和⼀个 中心部件(central widget),它是许多应用程序的基础,如文本编辑器,图片编辑器等。如下图为 QMainwindow 中 各组件所处的位置:
YoungMLet
2024/07/16
4880
【QT】Qt 窗口 (QMainWindow)
PyQt5 基本窗口控件(绘图类 / 拖拽 / 剪贴板 / 日历时间 / 菜单、工具、状态栏 / 打印)
learn from 《PyQt5 快速开发与实战》 https://doc.qt.io/qtforpython/index.html https://www.riverbankcomputing.com/static/Docs/PyQt5
Michael阿明
2022/05/10
1.9K0
PyQt5 基本窗口控件(绘图类 / 拖拽 / 剪贴板 / 日历时间 / 菜单、工具、状态栏 / 打印)
在 PyQt4 中的菜单和工具栏¶
QtGui.QMainWindow 类提供了一个应用的主窗口。这使得我们可以创建典型的应用框架,包括状态栏,工具栏和菜单。
bear_fish
2018/09/20
6310
在 PyQt4 中的菜单和工具栏¶
http://www.cppblog.com/mirguest/archive/2012/02/05/164982.html
bear_fish
2018/09/20
6070
PyQt5 布局管理(水平、垂直、网格、表单、嵌套、QSplitter)
learn from 《PyQt5 快速开发与实战》 https://doc.qt.io/qtforpython/index.html https://www.riverbankcomputing.com/static/Docs/PyQt5
Michael阿明
2022/06/06
4.7K0
PyQt5 布局管理(水平、垂直、网格、表单、嵌套、QSplitter)
【QT】:QMainWindow 窗口
Qt 中的菜单栏是通过 QMenuBar 这个类来实现的。一个主窗口最多只有一个菜单栏,位于主窗口顶部、主窗口标题栏下面。
IsLand1314
2025/02/18
2140
【QT】:QMainWindow 窗口
Python Qt GUI设计:窗口布局管理方法【强化篇】(基础篇—6)
在Python Qt GUI设计:窗口布局管理方法【基础篇】(基础篇—5)文章中,聊到了如何使用Qt Designer进行窗口布局管理,其实在Qt Designer中可以非常方便进行窗口布局管理设计,本篇博文在4种窗口布局方式基础上继续深入聊聊API函数~
不脱发的程序猿
2021/10/09
4.7K0
Python Qt GUI设计:窗口布局管理方法【强化篇】(基础篇—6)
PyQt5 基本窗口控件(状态栏/窗口/图标/提示消息/QLabel/文本类控件)
文章目录 1. 状态栏 2. 窗口居中显示 3. 关闭窗口 4. QWidget 5. 添加图标 6. 气泡提示信息 7. QLabel 添加快捷键 8. QLineEdit echoMode 验证器 inputMask 综合练习 9. QTextEdit learn from 《PyQt5 快速开发与实战》 1. 状态栏 self.statusbar.showMessage("hello, Michael", 2000),第二个参数是显示多长时间ms,默认无限长时间 # -*- coding: utf-8
Michael阿明
2022/05/10
2.6K0
PyQt5 基本窗口控件(状态栏/窗口/图标/提示消息/QLabel/文本类控件)
QT入门学习——从信号和槽到对话框,各种控件以及自定义控件
2.4默认创建有窗口类,yWidget,基类有三种选择:QWidget、QMainWindowQDialog
Arya
2024/10/31
5510
QT入门学习——从信号和槽到对话框,各种控件以及自定义控件
【Qt】:Dialog 对话框
模态对话框 指的是:显示后无法与父窗口进行交互,是⼀种阻塞式的对话框。使用 QDialog:: exec () 函数调用。
IsLand1314
2025/02/28
1620
【Qt】:Dialog 对话框
PyQt5 入门
我的路径是:D:\ProgramData\Anaconda3\envs\cv\Lib\site-packages\qt5_applications\Qt\bin\designer.exe
Michael阿明
2022/05/10
1.5K0
PyQt5 入门
PYQT中的菜单和工具栏
在这部分的PyQt5教程中,我们将创建菜单和工具栏。菜单式位于菜单栏的一组命令操作。工具栏是应用窗体中由按钮和一些常规命令操作组成的组件。
小飞侠xp
2018/12/24
2.7K0
PYQT中的菜单和工具栏
学习 QT 过程中的一些笔记
学习 QT 时候做的一些笔记 目录 目录 按钮常用控件 API QT 窗口体系 信号和槽 点击按钮关闭窗口 自定义信号和槽 当自定义信号和槽出现重载 无参信号和有参信号与槽连接 Lambda 表达式 利用 Lambda 表达式实现点击按钮关闭窗口 QMainWindow 菜单栏 工具栏 状态栏 标签控件 浮动窗口 核心部件 资源文件 对话框 话框分类: 标准对话框 其他标准对话框 界面布局 控件 按钮组 QListWidget 列表容器 按钮常用控件 API QPushButton * btn = new
HauHau
2022/01/12
1.2K0
QT Creator 快速入门教程 读书笔记(二)
一 窗口部件 基础窗口部件QWidget类是所有用户界面对象的基类,窗口和控件都是直接或间接继承自 QWidget,下面我们来看一个很简单的例子: 窗口部件(Widget)简称部件,是QT中建立界面的
用户1198337
2018/01/19
1.3K0
QT Creator 快速入门教程 读书笔记(二)
PyQt5-Qt Designer中控件的尺寸相关设置(sizePolicy策略)
虫无涯
2023/10/09
3K0
推荐阅读
相关推荐
【Python篇】PyQt5 超详细教程——由入门到精通(终篇)
更多 >
目录
  • 1、首先说一下工作原理: 
  • 2、二次排序
  • 3、具体步骤: 
    • 1 自定义key。 
    • 2 由于key是自定义的,所以还需要自定义一下类: 
  • 4 代码:
  • 5 测试需求:
  • 6 测试数据与结果:
  • 7 原理图(点击查看大图):
  • 8、推荐阅读:
  • 9、REF:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档