MapReduce Join

5万人关注的大数据成神之路,不来了解一下吗?

5万人关注的大数据成神之路,真的不来了解一下吗?

5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》

分享两段代码,可以直接在项目中复用:

Map Side Join

package MapJoin;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/*
 *
 Table1
 011990-99999    SIHCCAJAVRI
 012650-99999    TYNSET-HANSMOEN


 Table2
 012650-99999    194903241200    111
 012650-99999    194903241800    78
 011990-99999    195005150700    0
 011990-99999    195005151200    22
 011990-99999    195005151800    -11
 * */

public class MapJoin {
    static class mapper extends Mapper<LongWritable, Text, Text, Text> {
        private Map<String, String> Table1Map = new HashMap<String, String>();

        // 将小表读到内存HashMap中
        protected void setup(Context context) throws IOException {
            URI[] paths = context.getCacheFiles();

            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            FSDataInputStream fsr = fs.open(new Path(paths[0].toString()));
            // BufferedReader br = new BufferedReader(new FileReader(
            // paths[0].toString()));
            String line = null;
            try {
                while ((line = fsr.readLine().toString()) != null) {
                    String[] vals = line.split("\\t");
                    if (vals.length == 2) {
                        Table1Map.put(vals[0], vals[1]);
                    }
                }
            } catch (Exception e) {
                // TODO: handle exception
                e.printStackTrace();
            } finally {
                fsr.close();
            }
        }

        // 对大表进行Map扫描
        protected void map(LongWritable key, Text val, Context context)
                throws IOException, InterruptedException {
            String[] vals = val.toString().split("\\t");
            if (vals.length == 3) {
                // 每条记录都用外键对HashMap get
                String Table1Vals = Table1Map.get(vals[0]);
                Table1Vals = (Table1Vals == null) ? "" : Table1Vals;
                context.write(new Text(vals[0]), new Text(Table1Vals + "\t"
                        + vals[1] + "\t" + vals[2]));
            }
        }
    }

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err
                    .println("Parameter number is wrong, please enter three parameters:<big table hdfs input> <small table local input> <hdfs output>");
            System.exit(-1);
        }

        Job job = new Job(conf, "MapJoin");

        job.setJarByClass(MapJoin.class);
        job.setMapperClass(mapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.addCacheFile((new Path(args[1]).toUri()));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
Reduce Side Joinpackage ReduceJoin;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*user.csv文件:

"ID","NAME","SEX"
"1","user1","0"
"2","user2","0"
"3","user3","0"
"4","user4","1"
"5","user5","0"
"6","user6","0"
"7","user7","1"
"8","user8","0"
"9","user9","0"

order.csv文件:

"USER_ID","NAME"
"1","order1"
"2","order2"
"3","order3"
"4","order4"
"7","order7"
"8","order8"
"9","order9"
*/

public class ReduceJoin {

    public static class MapClass extends   
    Mapper<LongWritable, Text, Text, Text>
{

    //最好在map方法外定义变量,以减少map计算时创建对象的个数
    private Text key = new Text();
    private Text value = new Text();
    private String[] keyValue = null;
      
    @Override  
    protected void map(LongWritable key, Text value, Context context)  
        throws IOException, InterruptedException
    {
        //value是每一行的内容,Text类型,所有我们要把key从value中解析出来
        keyValue = value.toString().split(",", 2);
        this.key.set(keyValue[0]);  //把外键设为MapReduce key
        this.value.set(keyValue[1]);
        context.write(this.key, this.value);
    }
      
}
  
public static class Reduce extends Reducer<Text, Text, Text, Text>
{

    private Text value = new Text();
      
    @Override  
    protected void reduce(Text key, Iterable<Text> values, Context context)  
            throws IOException, InterruptedException
    {
        StringBuilder valueStr = new StringBuilder();
          
        //values中的每一个值是不同数据文件中的具有相同key的值
        //即是map中输出的多个文件相同key的value值集合
        for(Text val : values)  
        {
            valueStr.append(val);
            valueStr.append(",");
        }
          
        this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString());
        context.write(key, this.value);
    }
      
}
    
    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        Job job = new Job(conf, "MyJoin");
          
        job.setJarByClass(ReduceJoin.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
        //job.setCombinerClass(Reduce.class);
          
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
          
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
          
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
          
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}分享和点赞是最大的支持~

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-04-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Hadoop支持Lzo压缩配置及案例

    1)hadoop本身并不支持lzo压缩,故需要使用twitter提供的hadoop-lzo开源组件。hadoop-lzo需依赖hadoop和lzo进行编译,编译...

    王知无
  • 听说你熟悉Flink-On-Yarn的部署模式?

    Flink提供了两种在yarn上运行的模式,分别为Session-Cluster和Per-Job-Cluster模式,本文分析两种模式及启动流程。

    王知无
  • 网站日志实时分析之Flink处理实时热门和PVUV统计

    王知无
  • 干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

    正文开始前 ,先介绍几个概念 序列化 所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储。 反序列化 是指将字节流转回到结构化...

    汤高
  • Hadoop Streaming 读ORC文件

    hadoop Streaming的处理流程是先通过inputFormat读出输入文件内容,将其传递mapper,再将mapper返回的key,value传给re...

    YG
  • 大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客

      MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。

    黑泽君
  • MapReduce Join

    用户1621453
  • mapreduce如何使用本地文件 转

    stys35
  • hadoop_eclipse及HDT插件的使用

    甜橙很酸
  • MapReduce并行编程模型和框架

    Combiner:进行中间结果数据网络传输优化的工作。Combiner程序的执行是在Map节点完成计算之后、输出结果之前。

    chaplinthink

扫码关注云+社区

领取腾讯云代金券