前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址

大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址

作者头像
黑泽君
发布2019-04-25 11:01:21
8100
发布2019-04-25 11:01:21
举报
文章被收录于专栏:黑泽君的专栏黑泽君的专栏

二十、数据分析

20.1、统计表

通过表结构可以发现,只要维度id确定了,那么 new_install_users 也就确定了。

20.2、目标

  按照不同维度统计新增用户。比如:将 日、周、月 新增用户统计出来。传入的时间参数是: -date 2017-08-14

20.3、代码实现

20.3.1、Mapper
  • Step1、创建 NewInstallUsersMapper 类,outputKey 为 StatsUserDimension,outputValue 为 Text。定义全局变量,Key 和 Value 的对象。
  • Step2、覆写 map 方法,在该方法中读取 HBase 中待处理的数据,分别要包含维度的字段信息以及必有的字段信息。比如:serverTime、platformName、platformVersion、browserName、browserVersion、uuid。
  • Step3、数据过滤以及时间字符串转换。
  • Step4、构建维度信息:天维度,周维度,月维度,platform 维度[(name, version)(name, all)(all, all)],browser 维度[(browser, all) (browser, version)]。
  • Step5、设置 outputValue 的值为 uuid。
  • Step6、按照不同维度设置 outputKey。
  • Step7、将封装好的数据写入到 Mapper 的上下文对象中,输出给 Reducer。

示例代码如下: NewInstallUsersMapper.java

代码语言:javascript
复制
package com.z.transformer.mr.statistics;

import java.io.IOException;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

import com.z.transformer.common.DateEnum;
import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.GlobalConstants;
import com.z.transformer.common.KpiType;
import com.z.transformer.dimension.key.base.BrowserDimension;
import com.z.transformer.dimension.key.base.DateDimension;
import com.z.transformer.dimension.key.base.KpiDimension;
import com.z.transformer.dimension.key.base.PlatformDimension;
import com.z.transformer.dimension.key.stats.StatsCommonDimension;
import com.z.transformer.dimension.key.stats.StatsUserDimension;
import com.z.transformer.util.TimeUtil;

/**
 * 思路:思路:HBase 读取数据 --> HBaseInputFormat --> Mapper --> Reducer --> DBOutPutFormat--> 这接写入到 MySql 中
 * 
 * @author bruce
 */
public class NewInstallUserMapper extends TableMapper<StatsUserDimension, Text> {
    // Mapper 的 OutPutKey 和 OutPutValue
    // OutPutKey = StatsUserDimension 进行用户分析的组合维度(用户基本分析维度和浏览器分析维度)
    // OutPutValue = Text uuid(字符串)

    private static final Logger logger = Logger.getLogger(NewInstallUserMapper.class);

    // 定义列族
    private byte[] family = EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME;

    // 定义输出 key
    private StatsUserDimension outputKey = new StatsUserDimension();
    // 定义输出 value
    private Text outputValue = new Text();

    // 映射输出 key 中的 StatsCommonDimension(公用维度) 属性,方便后续封装操作
    private StatsCommonDimension statsCommonDimension = this.outputKey.getStatsCommon();

    private long date, endOfDate; // 定义运行天的起始时间戳和结束时间戳
    private long firstThisWeekOfDate, endThisWeekOfDate; // 定义运行天所属周的起始时间戳和结束时间戳
    private long firstThisMonthOfDate, firstDayOfNextMonth; // 定义运行天所属月的起始时间戳和结束时间戳

    // 创建 kpi 维度对象
    private KpiDimension newInstallUsersKpiDimension = new KpiDimension(KpiType.NEW_INSTALL_USER.name);
    private KpiDimension browserNewInstallUsersKpiDimension = new KpiDimension(KpiType.BROWSER_NEW_INSTALL_USER.name);

    // 定义一个特殊占位的浏览器维度对象
    private BrowserDimension defaultBrowserDimension = new BrowserDimension("", "");

    // 初始化操作
    @Override
    protected void setup(Mapper<ImmutableBytesWritable, Result, StatsUserDimension, Text>.Context context)
            throws IOException, InterruptedException {
        // 1、获取参数配置项的上下文
        Configuration conf = context.getConfiguration();
        // 2、获取我们给定的运行时间参数,获取运行的是哪一天的数据
        String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);

        // 传入时间所属当前天开始的时间戳,即当前天的0点0分0秒的毫秒值
        this.date = TimeUtil.parseString2Long(date);
        // 传入时间所属当前天结束的时间戳
        this.endOfDate = this.date + GlobalConstants.DAY_OF_MILLISECONDS;
        // 传入时间所属当前周的第一天的时间戳
        this.firstThisWeekOfDate = TimeUtil.getFirstDayOfThisWeek(this.date);
        // 传入时间所属下一周的第一天的时间戳
        this.endThisWeekOfDate = TimeUtil.getFirstDayOfNextWeek(this.date);
        // 传入时间所属当前月的第一天的时间戳
        this.firstThisMonthOfDate = TimeUtil.getFirstDayOfThisMonth(this.date);
        // 传入时间所属下一月的第一天的时间戳
        this.firstDayOfNextMonth = TimeUtil.getFirstDayOfNextMonth(this.date);
    }

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {
        // 1、获取属性,参数值,即读取 HBase 中的数据:serverTime、platformName、platformVersion、browserName、browserVersion、uuid
        String serverTime = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME)));
        String platformName = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)));
        String platformVersion = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION)));
        String browserName = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME)));
        String browserVersion = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION)));
        String uuid = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID)));

        // 2、针对数据进行简单过滤(实际开发中过滤条件更多)
        if (StringUtils.isBlank(platformName) || StringUtils.isBlank(uuid)) {
            logger.debug("数据格式异常,直接过滤掉数据:" + platformName);
            return; // 过滤掉无效数据
        }

        // 属性处理
        long longOfServerTime = -1;
        try {
            longOfServerTime = Long.valueOf(serverTime); // 将字符串转换为long类型
        } catch (Exception e) {
            logger.debug("服务器时间格式异常:" + serverTime);
            return; // 服务器时间异常的数据直接过滤掉
        }

        // 3、构建维度信息
        // 获取当前服务器时间对应的当天维度的对象
        DateDimension dayOfDimension = DateDimension.buildDate(longOfServerTime, DateEnum.DAY);
        // 获取当前服务器时间对应的当周维度的对象
        DateDimension weekOfDimension = DateDimension.buildDate(longOfServerTime, DateEnum.WEEK);
        // 获取当前服务器时间对应的当月维度的对象
        DateDimension monthOfDimension = DateDimension.buildDate(longOfServerTime, DateEnum.MONTH);
        // 还可以获取 当季维度、当年维度......

        // 构建平台维度对象
        List<PlatformDimension> platforms = PlatformDimension.buildList(platformName, platformVersion);
        // 构建浏览器维度对象
        List<BrowserDimension> browsers = BrowserDimension.buildList(browserName, browserVersion);

        // 4、设置 outputValue
        this.outputValue.set(uuid);

        // 5、设置 outputKey
        for (PlatformDimension pf : platforms) {
            // 设置浏览器维度(是个空的)
            this.outputKey.setBrowser(this.defaultBrowserDimension);
            // 设置平台维度
            this.statsCommonDimension.setPlatform(pf);

            // 下面的代码是处理对应于 stats_user 表的统计数据

            // 设置 kpi 维度
            this.statsCommonDimension.setKpi(this.newInstallUsersKpiDimension);

            // 处理不同时间维度的情况
            // 处理天维度数据,要求服务器时间处于指定日期的范围:[today, endOfDate)
            if (longOfServerTime >= date && longOfServerTime < endOfDate) {
                // 设置时间维度为服务器时间当天的维度
                this.statsCommonDimension.setDate(dayOfDimension);
                // 输出数据
                context.write(outputKey, outputValue);
            }

            // 处理周维度数据,范围:[firstThisWeekOfDate, endThisWeekOfDate)
            if (longOfServerTime >= firstThisWeekOfDate && longOfServerTime < endThisWeekOfDate) {
                // 设置时间维度为服务器时间所属周的维度
                this.statsCommonDimension.setDate(weekOfDimension);
                // 输出数据
                context.write(outputKey, outputValue);
            }

            // 处理月维度数据,范围:[firstThisMonthOfDate, firstDayOfNextMonth)
            if (longOfServerTime >= firstThisMonthOfDate && longOfServerTime < firstDayOfNextMonth) {
                // 设置时间维度为服务器时间所属月的维度
                this.statsCommonDimension.setDate(monthOfDimension);
                // 输出数据
                context.write(outputKey, outputValue);
            }

            // 下面的代码是处理对应于 stats_device_browser 表的统计数据

            // 设置 kpi 维度
            this.statsCommonDimension.setKpi(this.browserNewInstallUsersKpiDimension);
            for (BrowserDimension br : browsers) {
                // 设置浏览器维度
                this.outputKey.setBrowser(br);

                // 处理不同时间维度的情况
                // 处理天维度数据,要求当前事件的服务器时间处于指定日期的范围内,[今天0点, 明天0点)
                if (longOfServerTime >= date && longOfServerTime < endOfDate) {
                    // 设置时间维度为服务器时间当天的维度
                    this.statsCommonDimension.setDate(dayOfDimension);
                    // 输出数据
                    context.write(outputKey, outputValue);
                }

                // 处理周维度数据,范围:[firstThisWeekOfDate, endThisWeekOfDate)
                if (longOfServerTime >= firstThisWeekOfDate && longOfServerTime < endThisWeekOfDate) {
                    // 设置时间维度为服务器时间所属周的维度
                    this.statsCommonDimension.setDate(weekOfDimension);
                    // 输出数据
                    context.write(outputKey, outputValue);
                }

                // 处理月维度数据,范围:[firstThisMonthOfDate, firstDayOfNextMonth)
                if (longOfServerTime >= firstThisMonthOfDate && longOfServerTime < firstDayOfNextMonth) {
                    // 设置时间维度为服务器时间所属月的维度
                    this.statsCommonDimension.setDate(monthOfDimension);
                    // 输出数据
                    context.write(outputKey, outputValue);
                }
            }
        }

    }
}
20.3.2、Reducer
  • Step1、创建 NewInstallUserReducer<StatsUserDimension, Text, StatsUserDimension, MapWritableValue> 类,覆写 reduce 方法。
  • Step2、统计 uuid 出现的次数,并且去重。
  • Step3、将数据拼装到 outputValue 中。
  • Step4、设置数据业务 KPI 类型,最终输出数据。

维度类结构图

我们再来回顾下大数据离线平台架构图:

示例代码如下: NewInstallUserReducer.java

代码语言:javascript
复制
package com.z.transformer.mr.statistics;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.z.transformer.common.KpiType;
import com.z.transformer.dimension.key.stats.StatsUserDimension;
import com.z.transformer.dimension.value.MapWritableValue;

public class NewInstallUserReducer extends Reducer<StatsUserDimension, Text, StatsUserDimension, MapWritableValue> {

    // 保存唯一 id 的集合 Set,用于计算新增的访客数量
    private Set<String> uniqueSets = new HashSet<String>();

    // 定义输出 value
    private MapWritableValue outputValue = new MapWritableValue();

    @Override
    protected void reduce(StatsUserDimension key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // 1、统计 uuid 出现的次数,去重
        for (Text uuid : values) { // 增强 for 循环,遍历 values
            this.uniqueSets.add(uuid.toString());
        }

        // 2、输出数据拼装
        MapWritable map = new MapWritable();
        map.put(new IntWritable(-1), new IntWritable(this.uniqueSets.size()));
        this.outputValue.setValue(map);

        // 3、设置 outputValue 数据对应描述的业务指标(kpi)
        if (KpiType.BROWSER_NEW_INSTALL_USER.name.equals(key.getStatsCommon().getKpi().getKpiName())) {
            // 表示处理的是 browser new install user kpi 的计算
            this.outputValue.setKpi(KpiType.BROWSER_NEW_INSTALL_USER);
        } else if (KpiType.NEW_INSTALL_USER.name.equals(key.getStatsCommon().getKpi().getKpiName())) {
            // 表示处理的是 new install user kpi 的计算
            this.outputValue.setKpi(KpiType.NEW_INSTALL_USER);
        }

        // 4、输出数据
        context.write(key, outputValue);
    }
}
20.3.3、Runner
  • Step1、创建 NewInstallUserRunner 类,实现 Tool 接口。
  • Step2、添加时间处理函数,用来截取参数。
  • Step3、组装 Job。
  • Step4、设置 HBase InputFormat(设置从 HBase 中读取的数据都有哪些)。
  • Step5、自定义 OutPutFormat 并设置。

示例代码如下: NewInstallUserRunner.java

代码语言:javascript
复制
package com.z.transformer.mr.statistics;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.EventLogConstants.EventEnum;
import com.z.transformer.common.GlobalConstants;
import com.z.transformer.dimension.key.stats.StatsUserDimension;
import com.z.transformer.dimension.value.MapWritableValue;
import com.z.transformer.mr.TransformerMySQLOutputFormat;
import com.z.transformer.util.TimeUtil;

public class NewInstallUserRunner implements Tool {

    // 给定一个参数表示参数上下文
    private Configuration conf = null;

    public static void main(String[] args) {
        try {
            int exitCode = ToolRunner.run(new NewInstallUserRunner(), args);
            if (exitCode == 0) {
                System.out.println("运行成功");
            } else {
                System.out.println("运行失败");
            }
            System.exit(exitCode);
        } catch (Exception e) {
            System.err.println("执行异常:" + e.getMessage());
        }
    }

    @Override
    public void setConf(Configuration conf) {
        // 添加自己开发环境所有需要的其他资源属性文件
        conf.addResource("transformer-env.xml");
        conf.addResource("output-collector.xml");
        conf.addResource("query-mapping.xml");

        // 创建 HBase 的 Configuration 对象
        this.conf = HBaseConfiguration.create(conf);
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    @Override
    public int run(String[] args) throws Exception {
        // 1、获取参数上下文对象
        Configuration conf = this.getConf();

        // 2、处理传入的参数,将参数添加到上下文中
        this.processArgs(conf, args);

        // 3、创建 Job
        Job job = Job.getInstance(conf, "new_install_users");

        // 4、设置 Job 的 jar 相关信息
        job.setJarByClass(NewInstallUserRunner.class);

        // 5、设置 IntputFormat 相关配置参数
        this.setHBaseInputConfig(job);

        // 6、设置 Mapper 相关参数
        // 在 setHBaseInputConfig 已经设置了

        // 7、设置 Reducer 相关参数
        job.setReducerClass(NewInstallUserReducer.class);
        job.setOutputKeyClass(StatsUserDimension.class);
        job.setOutputValueClass(MapWritableValue.class);

        // 8、设置 OutputFormat 相关参数,使用一个自定义的 OutputFormat
        job.setOutputFormatClass(TransformerMySQLOutputFormat.class);

        // 9、Job 提交运行
        boolean result = job.waitForCompletion(true);
        // 10、运行成功返回 0,失败返回 -1
        return result ? 0 : -1;
    }

    /**
     * 处理时间参数,如果没有传递参数的话,则默认清洗前一天的。
     * 
     * Job脚本如下: bin/yarn jar ETL.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2017-08-14
     * 
     * @param args
     */
    private void processArgs(Configuration conf, String[] args) {
        String date = null;
        for (int i = 0; i < args.length; i++) {
            if ("-date".equals(args[i])) {
                if (i + 1 < args.length) {
                    date = args[i + 1];
                    break;
                }
            }
        }
        // 查看是否需要默认参数
        if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
            date = TimeUtil.getYesterday(); // 默认时间是昨天
        }
        // 保存到上下文中间
        conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
    }

    /**
     * 设置从 hbase 读取数据的相关配置信息
     * 
     * @param job
     * @throws IOException
     */
    private void setHBaseInputConfig(Job job) throws IOException {
        Configuration conf = job.getConfiguration();

        // 获取已经执行ETL操作的那一天的数据
        String dateStr = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); // 2017-08-14

        // 因为我们要访问 HBase 中的多张表,所以需要多个 Scan 对象,所以创建 Scan 集合
        List<Scan> scans = new ArrayList<Scan>();

        // 开始构建 Scan 集合
        // 1、构建 Hbase Scan Filter 对象
        FilterList filterList = new FilterList();
        // 2、构建只获取 Launch 事件的 Filter
        filterList.addFilter(new SingleColumnValueFilter(
                EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME, // 列族
                Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), // 事件名称
                CompareOp.EQUAL, // 等于判断
                Bytes.toBytes(EventEnum.LAUNCH.alias))); // Launch 事件的别名
        // 3、构建部分列的过滤器 Filter
        String[] columns = new String[] { 
                EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
                EventLogConstants.LOG_COLUMN_NAME_VERSION, // 平台版本
                EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
                EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, // 浏览器版本
                EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
                EventLogConstants.LOG_COLUMN_NAME_UUID, // 访客唯一标识符 uuid
                EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME // 确保根据事件名称过滤数据有效,所以需要该列的值
        };

        // 创建 getColumnFilter 方法用于得到 Filter 对象
        // 根据列名称过滤数据的 Filter
        filterList.addFilter(this.getColumnFilter(columns));

        // 4、数据来源表所属日期是哪些
        long startDate, endDate; // Scan 的表区间属于[startDate, endDate)

        long date = TimeUtil.parseString2Long(dateStr); // 传入时间所属当前天开始的时间戳,即当前天的0点0分0秒的毫秒值
        long endOfDate = date + GlobalConstants.DAY_OF_MILLISECONDS; // 传入时间所属当前天结束的时间戳

        long firstDayOfWeek = TimeUtil.getFirstDayOfThisWeek(date); // 传入时间所属当前周的第一天的时间戳
        long lastDayOfWeek = TimeUtil.getFirstDayOfNextWeek(date); // 传入时间所属下一周的第一天的时间戳
        long firstDayOfMonth = TimeUtil.getFirstDayOfThisMonth(date); // 传入时间所属当前月的第一天的时间戳
        long lastDayOfMonth = TimeUtil.getFirstDayOfNextMonth(date); // 传入时间所属下一月的第一天的时间戳

        // [date,
        // [firstDayOfWeek
        // [firstDayOfMonth
        // 选择最小的时间戳作为数据输入的起始时间,date 一定大于等于其他两个 first 时间戳值

        // 获取起始时间
        startDate = Math.min(firstDayOfMonth, firstDayOfWeek);

        // 获取结束时间
        endDate = TimeUtil.getTodayInMillis() + GlobalConstants.DAY_OF_MILLISECONDS;
        if (endOfDate > lastDayOfWeek || endOfDate > lastDayOfMonth) {
            endDate = Math.max(lastDayOfMonth, lastDayOfWeek);
        } else {
            endDate = endOfDate;
        }

        // 获取连接对象,执行,这里使用 HBase 的 新 API
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = null;
        try {
            admin = connection.getAdmin();
        } catch (Exception e) {
            throw new RuntimeException("创建 HBaseAdmin 对象失败", e);
        }

        // 5、构建我们 scan 集合
        try {
            for (long begin = startDate; begin < endDate;) {
                // 格式化 HBase 的后缀
                String tableNameSuffix = TimeUtil.parseLong2String(begin, TimeUtil.HBASE_TABLE_NAME_SUFFIX_FORMAT); // 20170814
                // 构建表名称:tableName = event_logs20170814
                String tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS + tableNameSuffix;

                // 需要先判断表存在,然后当表存在的情况下,再构建 Scan 对象
                if (admin.tableExists(TableName.valueOf(tableName))) {
                    // 表存在,进行 Scan 对象创建
                    Scan scan = new Scan();
                    // 需要扫描的 HBase 表名设置到 Scan 对象中
                    scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
                    // 设置过滤对象
                    scan.setFilter(filterList);
                    // 添加到 Scan 集合中
                    scans.add(scan);
                }

                // begin 累加
                begin += GlobalConstants.DAY_OF_MILLISECONDS;
            }
        } finally {
            // 关闭 Admin 连接
            try {
                admin.close();
            } catch (Exception e) {
                // nothing
            }
        }

        // 访问 HBase 表中的数据
        if (scans.isEmpty()) {
            // 没有表存在,那么 Job 运行失败
            throw new RuntimeException("HBase 中没有对应表存在:" + dateStr);
        }


        // 指定Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
        TableMapReduceUtil.initTableMapperJob(
                scans, // Scan 扫描控制器集合
                NewInstallUserMapper.class, // 设置 Mapper 类
                StatsUserDimension.class,  // 设置 Mapper 输出 key 类型
                Text.class, // 设置 Mapper 输出 value 值类型
                job,  // 设置给哪个 Job
                true); // 如果在 Windows 上本地运行,则 addDependencyJars 参数必须设置为 false,如果打成 jar 包提交 Linux 上运行设置为 true,默认为 true
    }

    /**
     * 获取一个根据列名称过滤数据的 Filter
     * 
     * @param columns
     * @return
     */
    private Filter getColumnFilter(String[] columns) {
        byte[][] prefixes = new byte[columns.length][];
        for (int i = 0; i < columns.length; i++) {
            prefixes[i] = Bytes.toBytes(columns[i]);
        }
        return new MultipleColumnPrefixFilter(prefixes);
    }
}
20.3.4、测试

Step1、使用 maven 插件:maven-shade-plugin,将第三方依赖的 jar 全部打包进去,需要在 pom.xml 中配置依赖。参考【章节 十七、工具代码导入】中的 pom.xml 文件。

代码语言:javascript
复制
1、-P local clean package(不打包第三方jar)
2、-P dev clean package install(打包第三方jar)(推荐使用这种,本案例使用这种方式)

Step2、在 hadoop-env.sh 添加内容:

代码语言:javascript
复制
[atguigu@hadoop102 hadoop]$ pwd
/opt/module/hadoop-2.7.2/etc/hadoop
[atguigu@hadoop102 hadoop]$ vim hadoop-env.sh

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

尖叫提示:修改该配置后,需要配置分发,然后重启集群,方可生效!!!

Step3、打包成功后,将要运行的 transformer-0.0.1-SNAPSHOT.jar 拷贝至 /opt/module/hbase/lib 目录下,然后同步到其他机器或者配置分发:

代码语言:javascript
复制
同步到其他机器
[atguigu@hadoop102 ~]$ scp -r /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar hadoop103:/opt/module/hbase/lib/
[atguigu@hadoop102 ~]$ scp -r /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar hadoop104:/opt/module/hbase/lib/

或者配置分发
[atguigu@hadoop102 ~]$ xsync /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar

尖叫提示:如果没有同步到其他机器或者配置分发,会出现类找不到异常,如下:

代码语言:javascript
复制
执行异常:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.z.transformer.dimension.key.stats.StatsUserDimension not found

4、运行 jar 包,命令如下:

代码语言:javascript
复制
先进行数据清洗
$ /opt/module/hadoop-2.7.2/bin/yarn jar /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2015-12-20

再进行统计运算
$ /opt/module/hadoop-2.7.2/bin/yarn jar /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar com.z.transformer.mr.statistics.NewInstallUserRunner -date 2015-12-20

二十一、Hive 之 hourly 分析

尖叫提示:由于 “-” 在 HBase 的表名中允许,在 Hive 的表名中不可以是 “-”,即在 Hive 中,“-” 是特殊字符,为了方便和统一,所以我们将 “-” 的地方替换为 “_”。这样就三者统一了。即 HDFS 上存放数据的目录变为 /event_logs/2015/12/20,HBase 数据库中的表名变为 event_logs20151220,Hive 中的表名为 event_logsxxx。

21.1、目标

  分析一天 24 个时间段的新增用户、活跃用户、会话个数和会话长度四个指标,最终将结果保存到 HDFS 中,使用 sqoop 导出到 Mysql。

21.2、目标解析

  • 新增用户:分析 launch 事件中各个不同时间段的 uuid 数量
  • 活跃用户:分析 pageview 事件中各个不同时间段的 uuid 数量
  • 会话个数:分析 pageview 事件中各个不同时间段的 会话id 数量
  • 会话长度:分析 pageview 事件中各个不同时间段内所有会话时长的总和

21.3、创建 Mysql 结果表

21.4、Hive 分析

21.4.1、创建 Hive 外部表,关联 HBase 数据表
代码语言:javascript
复制
21.4.2、创建临时表用于存放 pageview 和 launch 事件的数据(即存放过滤数据)
代码语言:javascript
复制
21.4.3、提取 e_pv 和 e_l 事件数据到临时表中
代码语言:javascript
复制
21.4.4、创建分析结果临时保存表
代码语言:javascript
复制
21.4.5、分析活跃访客数

Step1、具体平台,具体平台版本(platform:name, version:version)

代码语言:javascript
复制

Step2、具体平台,所有版本(platform:name, version:all)

代码语言:javascript
复制

Step3、所有平台,所有版本(platform:all, version:all)

代码语言:javascript
复制
21.4.6、分析会话长度

  将每个会话的长度先要计算出来,然后统计一个时间段的各个会话的总和。

Step1、具体平台,具体平台版本(platform:name, version:version)

代码语言:javascript
复制

Step2、具体平台,所有版本(platform:name, version:all)

代码语言:javascript
复制

Step3、所有平台,所有版本(platform:all, version:all)

代码语言:javascript
复制
21.4.7、创建最终结果表

  我们在这里需要创建一个和 Mysql 表结构一致的 Hive 表,便于后期使用 Sqoop 导出数据到 Mysql 中。

代码语言:javascript
复制
21.4.8、向结果表中插入数据

  我们需要 platform_dimension_id int, date_dimension_id int, kpi_dimension_id int 三个字段,所以我们需要使用 UDF 函数生成对应的字段。

Step1、编写 UDF 函数,见代码 Step2、编译打包 UDF 函数代码

代码语言:javascript
复制

Step3、上传 UDF 代码 jar 包到 HDFS

代码语言:javascript
复制

Step4、使用 UDF 的 jar

代码语言:javascript
复制

Step5、执行最终数据统计

代码语言:javascript
复制
21.4.9、使用 Sqoop 导出 数据到 Mysql,观察数据
代码语言:javascript
复制

二十二、常用 Maven 仓库地址

常用 Maven 仓库地址   中央库:http://repo.maven.apache.org/maven2/   CDN库:https://repository.cloudera.com/artifactory/cloudera-repos/   Maven 中央仓库最近更新的 Artifact:http://maven.outofmemory.cn/   Search/Browse/Explore Maven Repository:https://mvnrepository.com/

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-04-17 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 二十、数据分析
    • 20.1、统计表
      • 20.2、目标
        • 20.3、代码实现
          • 20.3.1、Mapper
          • 20.3.2、Reducer
          • 20.3.3、Runner
          • 20.3.4、测试
      • 二十一、Hive 之 hourly 分析
        • 21.1、目标
          • 21.2、目标解析
            • 21.3、创建 Mysql 结果表
              • 21.4、Hive 分析
                • 21.4.1、创建 Hive 外部表,关联 HBase 数据表
                • 21.4.2、创建临时表用于存放 pageview 和 launch 事件的数据(即存放过滤数据)
                • 21.4.3、提取 e_pv 和 e_l 事件数据到临时表中
                • 21.4.4、创建分析结果临时保存表
                • 21.4.5、分析活跃访客数
                • 21.4.6、分析会话长度
                • 21.4.7、创建最终结果表
                • 21.4.8、向结果表中插入数据
                • 21.4.9、使用 Sqoop 导出 数据到 Mysql,观察数据
            • 二十二、常用 Maven 仓库地址
            相关产品与服务
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档