前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >小白的大数据笔记——2(mapreduce实战)

小白的大数据笔记——2(mapreduce实战)

原创
作者头像
DifficultWork
修改2021-01-27 18:11:29
5040
修改2021-01-27 18:11:29
举报
文章被收录于专栏:阶梯计划阶梯计划

1 前言

好久未更。

抱着学Flink的心,没想到先试水最基本的mapreduce了。由于项目不便于公开,所以这里故事描述会进行一些演义,尽量不影响看官们理解。

2 故事背景

电流在两个电站间进行高压传输时会有能量衰耗,明显的差异时收端电压是略低于发端电压的,这个是电压衰耗(个人杜撰)。当电压衰耗达到一个阈值时可能会对用电造成问题。

已经具备的前提:

  • 每个电站每分钟会上报一下这一分钟内收/发电压值(假设所有电站的系统已经进行时钟同步);
  • 这些上报的数据都已经存入到hbase中;
  • 电网拓扑数据以及每条电路可接受的电压衰耗阈值; 需要达到的目标: 统计出过去一年内每条电路每个小时传输的异常率(即每个小时有多少个点的电压衰耗超过了阈值)

3 问题分析

首先这是一个离线计算问题,采用mapreduce或者spark都可以,由于对效率不是很敏感,所以选择了mapreduce。

然后对问题进行分解:

  1. 首先mapreduce的输入数据来自hbase,需要扫描hbase的记录(最初考虑直接读取hbase的hfile,效率会更好,但是由于自己聚合数据更复杂,最重要的是对执行效率不敏感);
  2. 由于hbase中的记录都是每个电站单点的数据,组装成一个电路的完整数据还需要结合电网拓扑,电网拓扑作为一个静态数据需要被每个工作节点可获取;
  3. 需要做job的级联,一级job扫描数据输出每条电路每个时间点的数据,二级job对数据做聚合,输出每条电路每小时的异常率(其实也可以用一个job完成,但是考虑到逻辑清晰,分两级更合理);
  4. 输出的结果存入mysql,即每个电路每小时的异常率作为一条数据,一年下来数据未超过百万级。

4 实现方案

数据流转方案
数据流转方案

mapreduce在处理数据时不会做时间上的保序,所以需要在reducer中开辟缓存进行聚合。

5 关键代码介绍

这里不着重介绍mapper和reducer的代码了,其他博主的例子都很多了。

代码语言:txt
复制
public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create(); // 基于HBaseConfiguration创建配置对象
    conf.set("zookeeper.znode.parent", "/hbase"); // 设置hbase zk的根目录
    conf.set("hbase.zookeeper.quorum", "127.0.0.1"); // 设置hbase zk的IP
    conf.set("hbase.zookeeper.property.clientPort", "2181"); // 设置hbase zk的端口
    conf.setLong("start-time", 1577808000000); // 设置计算范围的起始时间戳
    conf.setLong("end-time", 1609430399000); // 设置计算范围的结束时间戳
    
    TopoFetcher topo = new TopoFetcher(c.getCtrlUserName(), c.getCtrlPassword(), c.getCtrlAddr()); // 获取电网拓扑,请忽略
    topo.updateTopo();
    PhyNodeMap nodeMap = topo.getPhyNodeMap();
    ConfigurationUtil.setClass("node-map", conf, nodeMap); // 注意:Configuration不能存放对象,需要将对象序列化成json串存储,然后在工作节点取出反序列为对象
    PhyLinkMap linkMap = topo.getPhyLinkMap();
    ConfigurationUtil.setClass("link-map", conf, linkMap); // ConfigurationUtil是私有实现的

    System.setProperty("HADOOP_USER_NAME", "root");

    Path tmpDir = new Path("hdfs://127.0.0.1:9000/tmp_dir"); // 创建临时目录作为job1的输出(即job2的输入)
    FileSystem.get(conf).deleteOnExit(tmpDir); // 当主程序退出时会删除临时目录

    // 一阶段job将hbase的scan作为输入流
    List<Scan> list = new ArrayList<Scan>();
    Scan scan = new Scan();
    // scan.setCaching(1000); // 这里不设置会使用缺省cache size
    scan.setCacheBlocks(false);
    scan.setTimeRange(c.getStartTime()-120000, c.getEndTime()+120000); // 注意:设置扫描时间戳范围,这里的时间戳是hbase记录的入库时间戳,即使hbase是实时数据库,还是将扫描范围前后各加2分钟
    scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, "voltage".getBytes()); // 设置扫描的表名
    list.add(scan);
    // job1设置
    Job job1 = Job.getInstance(conf, "job1");
    job1.setJarByClass(MyMapreduce.class);
    TableMapReduceUtil.initTableMapperJob(list, Mapper1.class, Text.class, PortPmWritable.class, job1); // 里面会执行setMapperClass
    job1.setReducerClass(Reducer1.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(Text.class);
    job1.setMapOutputValueClass(SiteVoltageWritable.class); // 注意:当mapper和reducer的输出key和value一致时这里可不设置,如果设置会单独覆盖mapper
    job1.setNumReduceTasks(8); // 设置Reducer工作节点的数量,一般为CPU核数,如果不设置缺省为1
    FileOutputFormat.setOutputPath(job1, tmpDir); // 这里缺省设置OutputFormat是Text

    if (!job1.waitForCompletion(true)) {
        System.exit(1);
    }


    // 二阶段job,设置mysql数据库
    String driverClass = "com.mysql.cj.jdbc.Driver";
    String url = "jdbc:mysql://127.0.0.1:3306/testdb";
    DBConfiguration.configureDB(conf, driverClass, url, "root", "root");
    Job job2 = Job.getInstance(conf, "job2");
    job2.setJarByClass(MyMapreduce.class);
    job2.setMapperClass(Mapper2.class);
    job2.setReducerClass(Reducer2.class);
    job2.setOutputKeyClass(MyDbWritable.class);
    job2.setOutputValueClass(MyDbWritable.class);
    job2.setMapOutputKeyClass(Text.class);
    job2.setMapOutputValueClass(LongWritable.class);
    FileInputFormat.addInputPath(job2, tmpDir); // 将hdfs目录下的文件作为输入流
    job2.setInputFormatClass(KeyValueTextInputFormat.class); // 注意:因为job1的输出是key-value格式,所以这里的输入格式要指定是key-value格式的Text
    job2.setOutputFormatClass(DBOutputFormat.class); // 设置输出格式是数据库
    DBOutputFormat.setOutput(job2, "link_unavl", "link_id", "ts", "unavl"); // 第二个参数是数据库表名,后面参数是表中列名(按顺序)
    job2.setNumReduceTasks(8);

    if (!job2.waitForCompletion(true)) {
        System.exit(1);
    }

    System.exit(0);
}

5.1 坑点

  • reducer的工作节点是可以被复用的,所以当key变更时需要对缓存进行清理;
  • 本来job1的输出value想设置为对象的,但是这样对output格式需要做很多自定义,简单处理就是把对象json序列化,然后job2中读取后再反序列化;

6 参考代码链接

从hbase读取数据写入mysql

讨论区欢迎提问^^

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 前言
  • 2 故事背景
  • 3 问题分析
  • 4 实现方案
  • 5 关键代码介绍
  • 5.1 坑点
  • 6 参考代码链接
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档