大数据实战课程-离线日志分析实战

1课时
0学过
8分

课程评价 (0)

请对课程作出评价:
0/300

学员评价

暂无精选评价
120分钟

大数据实战课程-离线日志分析实战

离线日志分析实战练习

实验预计耗时:60分钟

1. 课程背景

1.1 课程目的

本实践课程主要分析用户在某一个浏览器上某一个小时的点击数量,核心场景是针对于电商网站的用户操作行为的分析。根据不同身份的用户在不同浏览器下的不同操作,进行用户的差异化消息推送,从而增加网站的浏览点击量以及订单数量等。

通过本课程的学习,可以让学员掌握MapReduce在数据清洗方面的应用,以及熟悉Hive在简单数据分析场景的使用。

1.2 课前知识准备

学习本课程前,学员需要掌握以下前置知识:

1、能力基础

  • Linux基本操作:掌握Linux远程登录、文件与目录管理、vim 编辑器使用等。
  • Hadoop基础:理解Hadoop基本组件的功能与原理。
  • Hive基础:Hive基本原理、HQL基本语法。
  • Java基础:掌握基本的MapReduce编程基础。

2、相关技术

  • Maven:Maven是一个项目管理工具,我们通过创建Maven工程快速获取项目所需要的jar包,并保存在本地仓库,也可以帮助我们对自己的项目进行生成jar包等项目管理操作。
  • MapReduce:MapReduce是一种编程模型,核心步骤主要包括"Map(映射)"和"Reduce(归约)"。MapReduce具有开发简单,可扩展性强,容错性强,适合PB级以上海量数据的离线处理等特点。
  • Hive:Hive是基于Hadoop的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。Hive数据仓库工具能将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,能将SQL语句转变成MapReduce任务来执行。其特点是学习成本低,使MapReduce变得更加简单。

3、相关概念

  • 电商网站用户分析的目的:
    • 提高网站的点击率。
    • 为了可以进行精准的广告投放和推广。
    • 为了增加网站的浏览人数和成功支付订单的数量。
    • 为了判断一个网站的好坏。
    • 有助于电商网站之间的竞争。
  • 电商网站用户分析相关概念:
    • 访客:未登陆注册的用户即为访客。
    • 会员:已经进行登录注册的用户。
    • PV:表示页面访问的数量。
    • UV:独立访客,即一台电脑客户端为一个访客。
    • 会话:访问一个网页即为一次会话过程。
    • DV:用户访问深度,简单理解就是指一个会话里面有多少个PV(访问了多少个页面)。
    • 外链:即从其他网站的链接进入我们的网站,这个其他网站上链接即为外链。
    • 跳出率:只访问了入口页面(例如网站首页)就离开的访问量与所产生总访问量的百分比。

2. 实验环境

2.1 实验操作环境

本课程需要以下实验操作环境:

  1. 可以接入互联网的笔记本电脑或者台式机,本实验使用Windows系统
  2. 实验环境:计算机本地(具有Java开发环境)+腾讯云控制台

2.2 实验架构图

2.3 实验的数据规划表

资源名称

数据

说明

腾讯云账号

账号:XXXXXXXX、密码:XXXXXXXX

涉及产品如下:VPC、EMR

PuTTY

版本:0.73

FlumeData.1532609566082

实验数据

PuTTY下载

实验数据下载

3. 实验流程

实验共包含三个任务,首先在计算机准备实验环境,使用腾讯云弹性MapReduce服务创建一个包含Hive组件的Hadoop集群。本地开发环境请学员自行准备,本实验不涉及演示。本实验开发环境需自行安装Java,Maven,一款SSH连接工具和一款IDE开发工具,本实验使用的SSH开发工具为PuTTY,使用的IDE为IntelliJ IDEA。

准备好开发环境后,会首先针对Flume采集的源数据进行数据清洗,我们使用MapReduce程序对数据进行清洗并输出,并存放在HDFS内。

接下来我们将清洗后的数据导入Hive原始表,并依据所需要分析的字段开始数据分析,并最终实现用户事件的聚合分析。

4. 实验步骤

任务1 实验环境准备

【任务目标】

通过EMR集群的搭建练习,使学员可以熟练搭建EMR集群,快速构建实验所需大数据平台环境。如已有EMR集群,本实验步骤可跳过。

【任务步骤】

1、EMR集群选购

1.在腾讯云官网,找到弹性MapReduce首页,点击立即购买

2.可用区与软件配置如下:

配置项

配置项说明

计费模式

按量计费

地域/可用区

广州/广州四区(可根据所在地自定义选择)

产品版本

EMR-V3.0.0

必选组件

hadoop、zookeeper、knox

可选组件

hive、tez

确认配置无误后,点击下一步:硬件配置

注意:请勿选择EMR-3.0.0 tlinux,如产品版本无EMR-3.0.0版本,请更换个人主账号进行实验。

3.硬件配置如下:

配置项

配置项说明

节点高可用

不启用

Master配置1台

EMR标准型S4 / 2核8G,CBS云盘100G

Core配置2台

EMR标准型S4 / 2核8G,CBS云盘100G

集群外网

开启集群Master节点公网

集群网络

新建或选择已有的私有网络

启动高可用选项可以自定义选择,默认是选择的,如果取消需要手动取消选择。由于我们这里的实验环境仅仅是一个学习的实验环境所以这里我们将此选项取消,实际生产中要根据实际环境合理选择是否需要这个配置。

确认硬件配置信息无误后,点击下一步:基础配置

4.基础配置如下:

配置项

配置项说明

集群名称

emr-test

远程登录

开启

安全组

创建新安全组

对象存储

开启

登录密码

EMR集群云主机root用户登录的密码

确认信息无误后,点击购买,会自动跳转至集群页。图中的集群实例状态中显示集群创建中

等待10min左右,集群构建成功,截图如下:

2、第三方工具连接EMR集群

1.复制集群页的主节点外网IP,打开PuTTY创建连接,将复制的外网IP粘贴至Host Name,端口默认22,如图:

2.点击Open,第一次连接会弹出安全警告,点击是(Y)

3.接下在login as:后填写用户名为root,密码为构建EMR的时候设置的密码:

备注:这里只能使用root用户进行连接。

回车确认后,我们即可成功访问主节点实例。输入Java进程查看命令jps,可看到应用进程已经启动。

任务2 数据清洗

【任务目标】

通过下载Scala插件为使用Idea开发Spark程序做准备。

【任务步骤】

1、获取实验数据

1.创建文件夹test;

mkdir /test

2.切换到test路径下;

cd /test

3.获取实验数据集,数据为服务器收集好的用户浏览电商网站的操作行为日志数据。

wget https://course-public-resources-1252758970.cos.ap-chengdu.myqcloud.com/%E5%AE%9E%E6%88%98%E8%AF%BE/202001bigdata/6-offline/FlumeData.1532609566082

服务器收集的原始数据格式为(为方便查看已添加换行):

192.168.222.1^A 1532609481.130^A hadoop1^A /?en=e_pv& p_url=http%3A%2F%2Flocalhost%3A8080%2Ftest-aura%2Fdemo2.jsp& p_ref=http%3A%2F%2Flocalhost%3A8080%2Ftest-aura%2Fdemo.jsp& tt=%E6%B5%8B%E8%AF%95%E9%A1%B5%E9%9D%A22&

ver=1&

pl=website&

sdk=js&

u_ud=0B649B94-945B-4E7C-8F90-504BC475C56D&

u_mid=gh&

u_sd=7828F430-2901-4D85-810A-5B574015B4AB&

c_time=1532695886616&

l=zh-CN&

b_iev=Mozilla%2F5.0%20(Windows%20NT%206.1%3B%20Win64%3B%20x64%3B%20rv%3A61.0)%20Gecko%2F20100101%20Firefox%2F61.0&

b_rst=1366*768

字段和字段直接使用^A进行分隔,每个属性之间通过&符号进行分隔,主要分析的字段包括事件名称,前一个浏览地址,当前的浏览地址,浏览器版本,平台等。本次实验的表格设计如下:

  • u_sd(会话id)例:7828F430-2901-4D85-810A-5B574015B4AB
  • ip:例:192.168.222.1
  • s_time(服务端时间)例:1532609481.130
  • en(事件名称)例:e_pv(pageView)、e_crt(chargeRequestEvent)、e_e(eventDurationEvent)
  • p_url(当前页面的url)例:http%3A%2F%2Flocalhost%3A8080%2Ftest-aura%2Fdemo2.jsp
  • p_ref(上一个页面的url)例:http%3A%2F%2Flocalhost%3A8080%2Ftest-aura%2Fdemo.jsp
  • pl(平台)例:website
  • u_ud(用户/访客唯一标识符)例:0B649B94-945B-4E7C-8F90-504BC475C56D
  • c_time(客户端时间)例:1532695886616

2、MapReduce实现数据清洗

1.启动IntelliJ IDEA,创建一个Maven工程,名称为offline-test

2.代码的编写及pom.xml的配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>offline-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
    <!-- hadoop start -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.2</version>
    </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.1.1</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.test.ip.AnalyserLogDataRunner</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

3.创建AnalyserLogDataMapper,编写Java代码如下:

package com.test.ip;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.net.URLDecoder;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class AnalyserLogDataMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String logText = value.toString(); //得到数据
        Map<String, String> clientInfo = new HashMap<String, String>();
        StringBuffer wordsBuffer = new StringBuffer();
        if (StringUtils.isNotBlank(logText)) {
            String[] splits = logText.trim().split("\\^A"); //分割数据
            if (splits.length == 4) {
                clientInfo.put("ip",splits[0].trim()); // 设置ip
                clientInfo.put("s_time",splits[1].trim()); // 设置s_time
                int index = splits[3].indexOf("?");
                if (index > -1) {
                    String requestBody = splits[3].substring(index + 1); // 获取请求参数,也就是我们的收集数据
                    handleRequestBody(requestBody, clientInfo); // 处理请求参数
                } else {
                    // 数据格式异常
                    clientInfo.clear();
                }
            }
        }
        wordsBuffer.append(clientInfo.get("u_sd")).append("\t");
        wordsBuffer.append(clientInfo.get("ip")).append("\t");
        wordsBuffer.append(clientInfo.get("s_time")).append("\t");
        wordsBuffer.append(clientInfo.get("en")).append("\t");
        wordsBuffer.append(clientInfo.get("p_url")).append("\t");
        wordsBuffer.append(clientInfo.get("p_ref")).append("\t");
        wordsBuffer.append(clientInfo.get("pl")).append("\t");
        wordsBuffer.append(clientInfo.get("u_ud")).append("\t");
        wordsBuffer.append(clientInfo.get("c_time"));
        String newWords = wordsBuffer.toString();
        context.write(new Text(newWords),NullWritable.get());
    }
    public static Date parseNginxServerTime2Date(String input) {
        if (StringUtils.isNotBlank(input)) {
            try {
                long timestamp = Double.valueOf(Double.valueOf(input.trim()) * 1000).longValue();
                Calendar calendar = Calendar.getInstance();
                calendar.setTimeInMillis(timestamp);
                Date date = calendar.getTime();
                date.getTime();
            } catch (Exception e) {
            }
        }
        return null;
    }
    private static void handleRequestBody(String requestBody, Map<String, String> clientInfo) {
        if (StringUtils.isNotBlank(requestBody)) {
            String[] requestParams = requestBody.split("&"); //分割数据
            for (String param : requestParams) {
                if (StringUtils.isNotBlank(param)) {
                    int index = param.indexOf("=");
                    if (index < 0) {
                        continue;
                    }
                    String key = null, value = null;
                    try {
                        key = param.substring(0, index);
                        value = URLDecoder.decode(param.substring(index + 1), "utf-8");
                    } catch (Exception e) {
                        continue;
                    }
                    if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
                        clientInfo.put(key, value);
                    }
                }
            }
        }
    }
}

4.创建AnalyserLogDataRunner,此次编写的虽然是MapReduce程序,但仅有Mapper而不需要Reducer。编写Java代码如下:

package com.test.ip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AnalyserLogDataRunner{
    public static void main(String args[]) throws Exception{
        //创建一个job
        Job job = Job.getInstance(new Configuration());
        //主程序入口
        job.setJarByClass(AnalyserLogDataRunner.class);
        //指定map和map的输出类型
        job.setMapperClass(AnalyserLogDataMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定任务输入输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //完成任务
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

5.代码编写后,对项目进行打包,并将生成的jar包复制到D盘等待上传EMR集群。

3、上传jar包并运行

1.找到PuTTY的安装目录,在上方地址栏输入cmd并执行。

2.打开psftp传输文件,使用命令如下:

在弹出的黑窗口首先输入psftp,打开psftp工具用来传输文件;

psftp

接下来连接服务器,回车后需要输入用户名和密码;

open xxx.xxx.xxx.xxx 

用于切换远程Linux 服务器上的目录;

cd /test/

lcd命令用于切换本地的路径;

lcd D:\

上传文件;

put offline-test-1.0-SNAPSHOT.jar

命令使用可以参考下图:

3.数据上传至HDFS。

切换至hadoop用户;

su hadoop

在HDFS内创建/test文件夹;

hdfs dfs -mkdir /test

将服务器收集好的用户浏览电商网站的操作行为日志数据存储到hdfs的/test/目录;

hdfs dfs -put  /test/FlumeData.1532609566082 /test

4.使用hadoop命令运行jar包,使用命令如下:

hadoop jar jar包名称.jar 主类 输入文件(HDFS) 输出路径(HDFS)

hadoop jar offline-test-1.0-SNAPSHOT.jar /test/FlumeData.1532609566082  /test/output
image018

5.查看输出结果。

查看/test/output目录下的文件信息;

hdfs dfs -ls /test/output

查看part-r-00000文件尾行数据;

hdfs dfs -tail /test/output/part-r-00000
image019

注意:此处输出结果如果显示有null值,请确认代码是否有误。

任务3 数据分析

【任务目标】

通过Hive对清洗后的数据进行建仓以及数据分析。

【任务步骤】

1、创建原始表

1.启动hive客户端。

hive

2.创建tx;

create database tx;

3.创建Hive表event_logs;

CREATE TABLE tx.event_logs (
  u_sd string,
  ip string,
  s_time string,
  en string,
  p_url string,
  p_ref string,
  pl string,
  u_ud string,
  c_time string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

下图为创建Hive表的语句效果图:

image020

4.加载加载数据;

LOAD DATA INPATH '/test/output/part-r-00000' OVERWRITE INTO TABLE tx.event_logs;

5.验证:查询前10行数据;

select * from tx.event_logs limit 10;
image021

2、用户事件统计分析

1.我们希望对每个用户每个时间的事件次数进行统计,创建临时表,使用如下语句:

 create table status_hourly_tmp (
     pl string,
     date_ string,
     hour int,
     u_ud string,
     en string);

2.想要查询每个用户每天每个小时事件的相关信息,需要使用提取event_logs表的以下字段:

  • s_time(服务端时间)例:1532609481.130
  • en(事件名称)例:e_pv(pageView)、e_crt(chargeRequestEvent)、e_e(eventDurationEvent)
  • pl(平台)例:website
  • u_ud(用户/访客唯一标识符)例:0B649B94-945B-4E7C-8F90-504BC475C56D

使用SQL语句如下:

 select pl, from_unixtime(cast(s_time as int), "yyyy-MM-dd") ,hour(from_unixtime(cast(s_time as int), "yyyy-MM-dd HH:mm:ss") ),u_ud,en from event_logs;

其中函数的含义:

  • from_unixtime:格式化时间戳函数
  • cast:数据类型转换函数
  • hour:转换为小时
image022

3.将结果存储到临时表中,HQL语句如下:

from event_logs
insert overwrite table status_hourly_tmp
select pl, 
from_unixtime(cast(s_time as int), "yyyy-MM-dd") ,
hour(from_unixtime(cast(s_time as int), "yyyy-MM-dd HH:mm:ss") )
,u_ud,en;

4.开启字段名显示

set hive.cli.print.header=true;

5.对每个用户每小时的事件次数进行统计,编写代码如下:

SELECT   
u_ud,
date_,   
hour,
SUM(if(en = 'e_pv', 1, 0)) AS e_pv, 
SUM(if(en = 'e_e', 1, 0)) AS e_e, 
SUM(if(en = 'e_crt', 1, 0)) AS e_crt
FROM status_hourly_tmp
GROUP BY u_ud,date_,hour;

查询的显示结果如下,由于本次实验实用的数据量较少,最终统计结果只有一行。

image023

至此,您已完成了离线日志分析实战的全部任务,相信您对离线日志分析流程已有了基本的掌握。

5. 注意事项

如实验资源无需保留,请在实验结束后及时销毁,以免产生额外费用。