hadoop系列之MR经典案例分享二

4、MapReduce的join(hive已经实现)

http://database.51cto.com/art/201410/454277.htm

这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据倾斜降到最低,使我们的代码倾向性更好

1)在Reudce端进行连接(最常见)

  • Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
  • Reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。

原理非常简单,下面来看一个实例:

自定义一个value返回类型:

MapReduce主体

其中具体的分析以及数据的输出输入请看代码中的注释已经写得比较清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

2)Mapduanjoin

使用场景:一张表十分小、一张表很大

  • 先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。
  • 扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。

将小表的数据读取出来,存放在内存中

右键 -> Source -> Override/Implement Methods -> setup+cleanup

案例

package com.mr.mapSideJoin;


import java.io.BufferedReader;   

import java.io.FileReader;   

import java.io.IOException;   

import java.util.HashMap;   

import org.apache.hadoop.conf.Configuration;   

import org.apache.hadoop.conf.Configured;   

import org.apache.hadoop.filecache.DistributedCache;   

import org.apache.hadoop.fs.Path;   

import org.apache.hadoop.io.Text;   

import org.apache.hadoop.mapreduce.Job;   

import org.apache.hadoop.mapreduce.Mapper;   

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   

import org.apache.hadoop.util.Tool;   

import org.apache.hadoop.util.ToolRunner;   

import org.slf4j.Logger;   

import org.slf4j.LoggerFactory; 



/**   

 * @author zengzhaozheng   

 *   

 * 用途说明:   

 * Map side join中的left outer join   

 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段   

 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),   

 * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":   

 * id     name  orderid  city_code  is_show   

 * 0       其他        9999     9999         0   

 * 1       长春        1        901          1   

 * 2       吉林        2        902          1   

 * 3       四平        3        903          1   

 * 4       松原        4        904          1   

 * 5       通化        5        905          1   

 * 6       辽源        6        906          1   

 * 7       白城        7        907          1   

 * 8       白山        8        908          1   

 * 9       延吉        9        909          1   

 * -------------------------风骚的分割线-------------------------------   

 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   

 * tb_user_profiles.dat文件内容,分隔符为"|":   

 * userID   network     flow    cityID   

 * 1           2G       123      1   

 * 2           3G       333      2   

 * 3           3G       555      1   

 * 4           2G       777      3   

 * 5           3G       666      4   

 * -------------------------风骚的分割线-------------------------------   

 *  结果:   

 *  1   长春  1   901 1   1   2G  123   

 *  1   长春  1   901 1   3   3G  555   

 *  2   吉林  2   902 1   2   3G  333   

 *  3   四平  3   903 1   4   2G  777   

 *  4   松原  4   904 1   5   3G  666   

 */ 

public class MapSideJoinMain extends Configured implements Tool{   

    private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class); 

  

    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {

 

        private HashMap<String,String> city_info = new HashMap<String, String>();   

        private Text outPutKey = new Text();   

        private Text outPutValue = new Text();   

        private String mapInputStr = null;   

        private String mapInputSpit[] = null;   

        private String city_secondPart = null; 

  

        /**   

         * 此方法在每个task开始之前执行,这里主要用作从DistributedCache   

         * 中取到tb_dim_city文件,并将里边记录取出放到内存中。   

         */ 

        @Override 

        protected void setup(Context context) throws IOException, InterruptedException {   

            BufferedReader br = null;   

            //获得当前作业的DistributedCache相关文件   

            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   

            String cityInfo = null;   

            for(Path p : distributePaths){   

                if(p.toString().endsWith("tb_dim_city.dat")){   

                    //读缓存文件,并放到mem中   

                    br = new BufferedReader(new FileReader(p.toString()));   

                    while(null!=(cityInfo=br.readLine())){   

                        String[] cityPart = cityInfo.split("\\|",5);   

                        if(cityPart.length ==5){   

                            city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);   

                        }   

                    }   

                }   

            }   

        }

 

        /**   

         * Map端的实现相当简单,直接判断tb_user_profiles.dat中的   

         * cityID是否存在我的map中就ok了,这样就可以实现Map Join了   

         */ 

        @Override 

        protected void map(Object key, Text value, Context context)   

                throws IOException, InterruptedException {   

            //排掉空行   

            if(value == null || value.toString().equals("")){   

                return;   

            }   

            mapInputStr = value.toString();   

            mapInputSpit = mapInputStr.split("\\|",4);   

            //过滤非法记录   

            if(mapInputSpit.length != 4){   

                return;   

            }   

            //判断链接字段是否在map中存在   

            city_secondPart = city_info.get(mapInputSpit[3]);   

            if(city_secondPart != null){   

                this.outPutKey.set(mapInputSpit[3]);   

                this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);   

                context.write(outPutKey, outPutValue);   

            }   

        }   

    }   



    @Override 

    public int run(String[] args) throws Exception {   

            Configuration conf=getConf(); //获得配置文件对象   

            DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件   

            Job job=new Job(conf,"MapJoinMR");   

            job.setNumReduceTasks(0);

 

            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   

            FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径

 

            job.setJarByClass(MapSideJoinMain.class);   

            job.setMapperClass(LeftOutJoinMapper.class);

 

            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   

            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

 

            //设置map的输出key和value类型   

            job.setMapOutputKeyClass(Text.class);

 

            //设置reduce的输出key和value类型   

            job.setOutputKeyClass(Text.class);   

            job.setOutputValueClass(Text.class);   

            job.waitForCompletion(true);   

            return job.isSuccessful()?0:1;   

    }   



    public static void main(String[] args) throws IOException,   

            ClassNotFoundException, InterruptedException {   

        try {   

            int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);   

            System.exit(returnCode);   

        } catch (Exception e) {   

            // TODO Auto-generated catch block   

            logger.error(e.getMessage());   

        }   

    }   

} 

DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性,接下来有用再整理研究一下写一篇blog,这里就不详细说了。

另外还有一种比较变态的Map Join方式,就是结合HBase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。

3)semi join

就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。

说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。

package com.mr.SemiJoin;   



import java.io.BufferedReader;   

import java.io.FileReader;   

import java.io.IOException;   

import java.util.ArrayList;   

import java.util.HashSet;   

import org.apache.hadoop.conf.Configuration;   

import org.apache.hadoop.conf.Configured;   

import org.apache.hadoop.filecache.DistributedCache;   

import org.apache.hadoop.fs.Path;   

import org.apache.hadoop.io.Text;   

import org.apache.hadoop.mapreduce.Job;   

import org.apache.hadoop.mapreduce.Mapper;   

import org.apache.hadoop.mapreduce.Reducer;   

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   

import org.apache.hadoop.mapreduce.lib.input.FileSplit;   

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   

import org.apache.hadoop.util.Tool;   

import org.apache.hadoop.util.ToolRunner;   

import org.slf4j.Logger;   

import org.slf4j.LoggerFactory;   



/**   

 * @author zengzhaozheng   

 *   

 * 用途说明:   

 * reudce side join中的left outer join   

 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段   

 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   

 * tb_dim_city.dat文件内容,分隔符为"|":   

 * id     name  orderid  city_code  is_show   

 * 0       其他        9999     9999         0   

 * 1       长春        1        901          1   

 * 2       吉林        2        902          1   

 * 3       四平        3        903          1   

 * 4       松原        4        904          1   

 * 5       通化        5        905          1   

 * 6       辽源        6        906          1   

 * 7       白城        7        907          1   

 * 8       白山        8        908          1   

 * 9       延吉        9        909          1   

 * -------------------------风骚的分割线-------------------------------   

 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   

 * tb_user_profiles.dat文件内容,分隔符为"|":   

 * userID   network     flow    cityID   

 * 1           2G       123      1   

 * 2           3G       333      2   

 * 3           3G       555      1   

 * 4           2G       777      3   

 * 5           3G       666      4   

 * -------------------------风骚的分割线-------------------------------   

 * joinKey.dat内容:   

 * city_code   

 * 1   

 * 2   

 * 3   

 * 4   

 * -------------------------风骚的分割线-------------------------------   

 *  结果:   

 *  1   长春  1   901 1   1   2G  123   

 *  1   长春  1   901 1   3   3G  555   

 *  2   吉林  2   902 1   2   3G  333   

 *  3   四平  3   903 1   4   2G  777   

 *  4   松原  4   904 1   5   3G  666   

 */ 

public class SemiJoin extends Configured implements Tool{   

    private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);   

    public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   

        private CombineValues combineValues = new CombineValues();   

        private HashSet<String> joinKeySet = new HashSet<String>();   

        private Text flag = new Text();   

        private Text joinKey = new Text();   

        private Text secondPart = new Text();   

        /**   

         * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b   

         */ 

        @Override 

        protected void setup(Context context) throws IOException, InterruptedException {   

            BufferedReader br = null;   

            //获得当前作业的DistributedCache相关文件   

            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());

            String joinKeyStr = null;   

            for(Path p : distributePaths){   

                if(p.toString().endsWith("joinKey.dat")){   

                    //读缓存文件,并放到mem中   

                    br = new BufferedReader(new FileReader(p.toString()));   

                    while(null!=(joinKeyStr=br.readLine())){   

                        joinKeySet.add(joinKeyStr);   

                    }   

                }   

            }   

        }   



        @Override 

        protected void map(Object key, Text value, Context context)   

                throws IOException, InterruptedException {   

            //获得文件输入路径   

            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   

            //数据来自tb_dim_city.dat文件,标志即为"0"   

            if(pathName.endsWith("tb_dim_city.dat")){   

                String[] valueItems = value.toString().split("\\|");   

                //过滤格式错误的记录   

                if(valueItems.length != 5){   

                    return;   

                }   

                //过滤掉不需要参加join的记录   

                if(joinKeySet.contains(valueItems[0])){   

                    flag.set("0");   

                    joinKey.set(valueItems[0]);   

                    secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   

                    combineValues.setFlag(flag);   

                    combineValues.setJoinKey(joinKey);   

                    combineValues.setSecondPart(secondPart);   

                    context.write(combineValues.getJoinKey(), combineValues);   

                }else{   

                    return ;   

                }   

            }//数据来自于tb_user_profiles.dat,标志即为"1"   

            else if(pathName.endsWith("tb_user_profiles.dat")){   

                String[] valueItems = value.toString().split("\\|");   

                //过滤格式错误的记录   

                if(valueItems.length != 4){   

                    return;   

                }   

                //过滤掉不需要参加join的记录   

                if(joinKeySet.contains(valueItems[3])){   

                    flag.set("1");   

                    joinKey.set(valueItems[3]);   

                    secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   

                    combineValues.setFlag(flag);   

                    combineValues.setJoinKey(joinKey);   

                    combineValues.setSecondPart(secondPart);   

                    context.write(combineValues.getJoinKey(), combineValues);   

                }else{   

                    return ;   

                }   

            }   

        }   

    }   



    public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   

        //存储一个分组中的左表信息   

        private ArrayList<Text> leftTable = new ArrayList<Text>();   

        //存储一个分组中的右表信息   

        private ArrayList<Text> rightTable = new ArrayList<Text>();   

        private Text secondPar = null;   

        private Text output = new Text();   



        /**   

         * 一个分组调用一次reduce函数   

         */ 

        @Override 

        protected void reduce(Text key, Iterable<CombineValues> value, Context context)   

                throws IOException, InterruptedException {   

            leftTable.clear();   

            rightTable.clear();   

            /**   

             * 将分组中的元素按照文件分别进行存放   

             * 这种方法要注意的问题:   

             * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,   

             * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最   

             * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。   

             */ 

            for(CombineValues cv : value){   

                secondPar = new Text(cv.getSecondPart().toString());   

                //左表tb_dim_city   

                if("0".equals(cv.getFlag().toString().trim())){   

                    leftTable.add(secondPar);   

                }   

                //右表tb_user_profiles   

                else if("1".equals(cv.getFlag().toString().trim())){   

                    rightTable.add(secondPar);   

                }   

            }   

            logger.info("tb_dim_city:"+leftTable.toString());   

            logger.info("tb_user_profiles:"+rightTable.toString());   

            for(Text leftPart : leftTable){   

                for(Text rightPart : rightTable){   

                    output.set(leftPart+ "\t" + rightPart);   

                    context.write(key, output);   

                }   

            }   

        }   

    }   



    @Override 

    public int run(String[] args) throws Exception {   

            Configuration conf=getConf(); //获得配置文件对象   

            DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);

            Job job=new Job(conf,"LeftOutJoinMR");   

            job.setJarByClass(SemiJoin.class);

 

            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   

            FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径

 

            job.setMapperClass(SemiJoinMapper.class);   

            job.setReducerClass(SemiJoinReducer.class);

 

            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   

            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

 

            //设置map的输出key和value类型   

            job.setMapOutputKeyClass(Text.class);   

            job.setMapOutputValueClass(CombineValues.class);

 

            //设置reduce的输出key和value类型   

            job.setOutputKeyClass(Text.class);   

            job.setOutputValueClass(Text.class);   

            job.waitForCompletion(true);   

            return job.isSuccessful()?0:1;   

    }   

    public static void main(String[] args) throws IOException,   

            ClassNotFoundException, InterruptedException {   

        try {   

            int returnCode =  ToolRunner.run(new SemiJoin(),args);   

            System.exit(returnCode);   

        } catch (Exception e) {   

            logger.error(e.getMessage());   

        }   

    }   

} 

承接子推荐阅读:

1, hadoop系列之基础系列

2,hadoop系列之深入优化

3,hadoop系列之MR的经典代码案例一

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2017-10-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Flutter入门

Weex是如何在Android客户端上跑起来的

Weex可以通过自己设计的DSL,书写.we文件或者.vue文件来开发界面,整个页面书写分成了3段,template、style、script,借鉴了成熟的MV...

4595
来自专栏cmazxiaoma的架构师之路

通用Mapper和PageHelper插件 学习笔记

1K3
来自专栏编码小白

tomcat请求处理分析(六)servlet的处理过程

1.1.1.1  servlet的解析过程 servlet的解析分为两步实现,第一个是匹配到对应的Wrapper,第二个是加载对应的servlet并进行数据,这...

7197
来自专栏恰童鞋骚年

Hadoop学习笔记—4.初识MapReduce

  MapReduce是Google的一项重要技术,它首先是一个编程模型,用以进行大数据量的计算。对于大数据量的计算,通常采用的处理手法就是并行计算。但对许多开...

932
来自专栏Pythonista

Django之ORM数据库

            django默认使用sqlite的数据库,默认自带sqlite的数据库驱动 , 引擎名称:django.db.backends.sqli...

1401
来自专栏刘望舒

LeakCanary看这一篇文章就够了

LeakCanary是Square公司基于MAT开源的一个内存泄漏检测工具,在发生内存泄漏的时候LeakCanary会自动显示泄漏信息。

2.3K5
来自专栏瓜大三哥

Yaffs_guts(三)

1.垃圾回收 1.static int yaffs_InitialiseBlocks(yaffs_Device *dev,int nBlocks)//块初始化 ...

2245
来自专栏about云

discuz论坛apache日志hadoop大数据分析项目:hive以及hbase是如何入库以及代码实现

about云discuz论坛apache日志hadoop大数据分析项目: 数据时如何导入hbase与hive的到了这里项目的基本核心功能已经完成。这里介绍一下h...

3628
来自专栏微服务那些事儿

关键数据变更监控

在经过了对mybatis的一番检索之后,没有发现对该需求的解决方式.在认知范围内,想到了使用mabatis拦截器解决该问题。

83819
来自专栏Java Web

JavaFX-TableView详解

前言 最近在着手一个学生管理系统的编写,涉及到TableView的使用,这前前后后的也有了些经验和想法想要记录和分享一下(事实上我正在想要用html网页代替界面...

4016

扫码关注云+社区

领取腾讯云代金券