前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据技术之_27_电商平台数据分析项目_03_项目概述 + 项目主体架构 + 模拟业务数据源 + 程序框架解析 + 需求解析 + 项目总结

大数据技术之_27_电商平台数据分析项目_03_项目概述 + 项目主体架构 + 模拟业务数据源 + 程序框架解析 + 需求解析 + 项目总结

作者头像
黑泽君
发布2019-06-14 10:13:46
3.2K0
发布2019-06-14 10:13:46
举报
文章被收录于专栏:黑泽君的专栏黑泽君的专栏

第1章 项目概述

电商分析平台是对用户访问电商平台的行为进行分析。

1.1 项目简介

  本项目主要讲解一个大型电商网站后台的企业级大数据统计分析平台,该平台以 Spark 为主,对电商网站的流量进行离线和实时的分析。   该大数据分析平台对电商网站的各种用户行为(访问行为、购物行为、广告点击行为等)进行复杂的分析。用统计分析出来的数据,辅助公司中的 PM(产品经理)、数据分析师以及管理人员分析现有产品的情况,并根据用户行为分析结果持续改进产品的设计,以及调整公司的战略和业务。最终达到用大数据技术来帮助提升公司的业绩、营业额以及市场占有率的目标。   项目主要使用了 Spark 技术生态栈中最常用的三个技术框架,Spark CoreSpark SQLSpark Streaming,进行离线计算和实时计算业务模块的开发。实现了包括用户访问 session 分析、页面单跳转化率统计、热门商品离线统计、广告流量实时统计 4 个业务模块。   项目中所有的业务功能模块都是直接从实际企业项目中抽取出来的,业务复杂度没有任何缩水,通过合理的将实际业务模块进行技术整合与改造,该项目几乎完全涵盖了 Spark Core、Spark SQL 和 Spark Streaming 这三个技术框架中大部分的功能点、知识点。

1.2 项目目标

  1、掌握电商系统中 Spark 的主要使用场景以及建设流程。   2、掌握企业级的 Spark 项目的复杂性能调优、线上故障解决经验、数据倾斜全套处理方案。   3、通过项目实战,完全将 Spark 所有技术点和知识点都应用在项目中,掌握如何灵活应用 Spark 各项技术来实现各种复杂业务需求。

1.3 业务需求简介

1.3.1 用户访问 session 统计

  用户在电商网站上,通常会有很多的访问行为,通常都是进入首页,然后可能点击首页上的一些商品,点击首页上的一些品类,也可能随时在搜索框里面搜索关键词,还可能将一些商品加入购物车,对购物车中的多个商品下订单,最后对订单中的多个商品进行支付。   用户的每一次操作,其实可以理解为一个 action,在本项目中,我们关注点击搜索下单支付这四个用户行为。   用户 session,是在电商平台的角度定义的会话概念,指的就是,从用户第一次进入首页,session 就开始了。然后在一定时间范围内,直到最后操作完(可能做了几十次、甚至上百次操作),离开网站,关闭浏览器,或者长时间没有做操作,那么 session 就结束了。 以上用户在网站内的访问过程,就称之为一次 session。简单理解,session 就是某一天某一个时间段内,某个用户对网站从打开/进入,到做了大量操作,到最后关闭浏览器的过程,就叫做 session。   session 实际上就是一个电商网站中最基本的数据。那么面向消费者/用户端的大数据分析(C端),最基本的就是面向用户访问行为/用户访问 session 的分析。   该模块主要是对用户访问 session 进行统计分析,包括 session 的聚合指标计算、按时间比例随机抽取 session、获取每天点击、下单和购买排名前 10 的品类、并获取 top10 品类的点击量排名前 10 的 session。该模块可以让产品经理、数据分析师以及企业管理层形象地看到各种条件下的具体用户行为以及统计指标,从而对公司的产品设计以及业务发展战略做出调整。主要使用 Spark Core 实现。

1.3.2 页面单跳转化率统计

  页面单跳转化率是一个非常有用的统计数据。   产品经理,可以根据这个指标,去尝试分析整个网站/产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。   数据分析师,可以基于此数据,做更深一步的计算和分析。   企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,做到心里有数,可以适当调整公司的经营战略或策略。   该模块主要是计算关键页面之间的单步跳转转化率,涉及到页面切片算法以及页面流匹配算法。该模块可以让产品经理、数据分析师以及企业管理层看到各个关键页面之间的转化率,从而对网页布局,进行更好的优化设计。主要使用 Spark Core 实现。

1.3.3 区域热门商品离线统计

  该模块主要实现每天统计出各个区域的 top3 热门商品。   我们认为,不同地区的经济发展水平不同,地理环境及气候不同,人们的风土人情和风俗习惯不同,因此对于不同商品的需求不同,根据区域热门商品的统计,可以让公司决策层更好的对不同类型商品进行布局,使商品进入最需要他的区域。   该模块可以让企业管理层看到公司售卖的商品的整体情况,从而对公司的商品相关的战略进行调整。主要使用 Spark SQL 实现。

1.3.4 广告流量实时统计

  网站/app 中经常会给第三方平台做广告,这也是一些互联网公司的核心收入来源;当广告位招商完成后,广告会在 网站/app 的某个广告位发布出去,当用户访问 网站/app 的时候,会看到相应位置的广告,此时,有些用户可能就会去点击那个广告。   我们要获取用户点击广告的行为,并针对这一行为进行计算和统计。   用户每次点击一个广告以后,会产生相应的埋点日志;在大数据实时统计系统中,会通过某些方式将数据写入到分布式消息队列中(Kafka)。   日志发送给后台 web 服务器(nginx),nginx 将日志数据负载均衡到多个 Tomcat 服务器上,Tomcat 服务器会不断将日志数据写入 Tomcat 日志文件中,写入后,就会被日志采集客户端(比如 Flume Agent)所采集,随后写入到消息队列中(Kafka),我们的实时计算程序会从消息队列中( Kafka)去实时地拉取数据,然后对数据进行实时的计算和统计。 Kafka这个模块的意义在于,让产品经理、高管可以实时地掌握到公司打的各种广告的投放效果。以便于后期持续地对公司的广告投放相关的战略和策略,进行调整和优化;以期望获得最好的广告收益。   该模块负责实时统计公司的广告流量,包括广告展现流量和广告点击流量。实现动态黑名单机制,以及黑名单过滤实现滑动窗口内的各城市的广告展现流量和广告点击流量的统计实现每个区域每个广告的点击流量实时统计实现每个区域 top3 点击量的广告的统计。主要使用 Spark Streaming 实现。

第2章 项目主体架构

2.1 项目架构

  用户行为数据在网站上最简单的存在形式就是日志。网站在运行过程中会产生大量的原始日志 RAW LOG,将其存储在文件系统中,企业会将多种原始日志按照用户行为汇总成会话日志 SESSION LOG,每一个会话日志表示用户的一种反馈。   本项目分为离线分析系统实时分析系统两大模块。   在离线分析系统中,我们将模拟业务数据写入 Hive 表中,离线分析系统从 Hive 中获取数据,并根据实际需求(用户访问 Session 分析、页面单跳转化率分析、各区域热门商品统计) 对数据进行处理,最终将分析完毕的统计数据存储到 MySQL 的对应表格中。   在实时分析系统中,我们将模拟业务数据写入 Kafka 集群中, 实时分析系统从 Kafka broker 中获取数据,通过 Spark Streaming 的流式处理对广告点击流量进行实时分析,最终将统计结果存储到 MySQL 的对应表格中。

2.2 离线日志采集宏观流程(参考)

2.3 实时日志采集宏观流程(参考)

2.4 离线/实时日志采集框架

  上图是一个企业级的日志处理框架,这一框架实现了对日志信息进行采集、汇总、清洗、聚合、分析的完整过程,并将日志数据分别存储到了离线和实时数据处理模块中,使得分析系统可以通过离线和实时两个角度对数据进行分析统计,并根据统计结果指导业务平台的改良和优化。

第3章 模拟业务数据源

3.1 离线数据

举例

3.1.1 数据模型与数据说明

1、user_visit_action user_visit_action 表,存放网站或者 APP 每天的点击流数据。通俗地讲,就是用户对 网站/APP 每点击一下,就会产生一条存放在这个表里面的数据。

user_visit_action 表中的字段解析如下所示:

代码语言:javascript
复制
字段名称            说明
date                日期,代表这个用户点击行为是在哪一天发生的
user_id             用户 ID,唯一地标识某个用户
session_id          Session ID,唯一地标识某个用户的一个访问 session
page_id             页面 ID,点击了某些商品/品类,也可能是搜索了某个关键词,然后进入了某个页面,页面的 id
action_time         动作时间,这个点击行为发生的时间点
search_keyword      搜索关键词,如果用户执行的是一个搜索行为,比如说在 网站/app 中,搜索了某个关键词,然后会跳转到商品列表页面
click_category_id   点击品类 ID,可能是在网站首页,点击了某个品类(美食、电子设备、电脑)
click_product_id    点击商品 ID,可能是在网站首页,或者是在商品列表页,点击了某个商品(比如呷哺呷哺火锅 XX 路店 3 人套餐、iphone 6s)
order_category_ids  下单品类 ID,代表了可能将某些商品加入了购物车,然后一次性对购物车中的商品下了一个订单,这就代表了某次下单的行为中,有哪些商品品类,可能有 6 个商品,但是就对应了 2 个品类,比如有 3 根火腿肠(食品品类),3 个电池(日用品品类)
order_product_ids   下单商品 ID,某次下单,具体对哪些商品下的订单
pay_category_ids    付款品类 ID,对某个订单,或者某几个订单,进行了一次支付的行为,对应了哪些品类
pay_product_ids     付款商品 ID,支付行为下,对应的哪些具体的商品
city_id             城市 ID,代表该用户行为发生在哪个城市

2、user_info user_info 表,是一张普通的用户基本信息表;这张表中存放了 网站/APP 所有注册用户的基本信息。

user_info 表中的字段解析如下所示:

代码语言:javascript
复制
字段名称            说明
user_id             用户 ID,唯一地标识某个用户
username            用户登录名
name                用户昵称或真实姓名
age                 用户年龄
professional        用户职业
city                用户所在城市
sex                 用户性别

3、product_info product_info 表,是一张普通的商品基本信息表;这张表中存放了 网站/APP 所有商品的基本信息。

product_info 表中的字段解析如下所示:

代码语言:javascript
复制
字段名称            说明
proudct_id          商品 ID,唯一地标识某个商品
product_name        商品名称
extend_info         额外信息,例如商品为自营商品还是第三方商品

3.2 实时数据

3.2.1 数据模型与数据说明

程序每 5 秒向 Kafka 集群写入数据,格式如下: 格式 :timestamp province city userid adid 在线数据的字段解析如下所示:

代码语言:javascript
复制
字段名称            取值范围
timestamp           当前时间毫秒
userId              0 – 99
provice/city        1 – 9((0L," 北京"," 北京"),(1L," 上海"," 上海"),(2L," 南京"," 江苏省"),(3L,"广州","广东省"),(4L,"三亚","海南省"),(5L,"武汉","湖北省"),(6L,"长沙","湖南省"),(7L,"西安","陕西省"),(8L,"成都","四川省"),(9L,"哈尔滨","东北省"))
adid                0 - 19

第4章 程序框架解析

新建一个 maven 工程 commerce_basic 作为父 maven 工程,引入依赖 pom.xml pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>commerce</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>commons</module>
        <module>mock</module>
    </modules>

    <!-- 声明子项目公用的配置属性 -->
    <properties>
        <spark.version>2.1.1</spark.version>
        <scala.version>2.11.8</scala.version>
        <log4j.version>1.2.17</log4j.version>
        <slf4j.version>1.7.22</slf4j.version>
    </properties>

    <!-- 声明并引入子项目共有的依赖 -->
    <dependencies>
        <!-- 所有子项目的日志框架 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- 具体的日志实现 -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <!-- Logging End -->
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <!-- 引入 Spark 相关的 Jar 包 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
                <!-- provider 如果存在,那么运行时该 Jar 包不存在,也不会打包到最终的发布版本中,只是编译器有效 -->
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-graphx_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
        </dependencies>
    </dependencyManagement>

    <!-- 声明构建信息 -->
    <build>
        <!-- 声明并引入子项目共有的插件:插件就是负责到 Maven 各个声明周期的具体实现 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <!-- 所有的编译都依照 JDK1.8 来搞 -->
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>

        <!-- 仅声明子项目共有的插件,如果子项目需要此插件,那么子项目需要声明 -->
        <pluginManagement>
            <plugins>
                <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <!-- 声明绑定到 maven 的 compile 阶段 -->
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <!-- 用于项目的打包插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.0.0</version>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

4.1 mock 模块(模拟数据产生模块)

新建一个模块 maven 工程 mock 作为子 maven 工程,引入依赖 pom.xml pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>commerce</artifactId>
        <groupId>com.atguigu</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>mock</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.atguigu</groupId>
            <artifactId>commons</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <!-- Spark 的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

类(包)名称

类(包)结构图

MockDataGenerate.scala

代码语言:javascript
复制
import java.util.UUID

import commons.model.{ProductInfo, UserInfo, UserVisitAction}
import commons.utils.{DateUtils, StringUtils}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

/**
  * 离线模拟数据的生成
  *
  * date:是当前日期
  * age: 0 - 59
  * professionals: professional[0 - 99]
  * cities: 0 - 99
  * sex: 0 - 1
  * keywords: ("火锅", "蛋糕", "重庆辣子鸡", "重庆小面", "呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉")
  * categoryIds: 0 - 99
  * ProductId: 0 - 99
  */
object MockDataGenerate {
  /**
    * 模拟用户的行为信息
    *
    * @return
    */
  private def mockUserVisitActionData(): Array[UserVisitAction] = {
    val rows = ArrayBuffer[UserVisitAction]()
    val random = new Random()

    val searchKeywords = Array("华为手机", "联想笔记本", "小龙虾", "卫生纸", "吸尘器", "Lamer", "机器学习", "苹果", "洗面奶", "保温杯")
    // yyyy-MM-dd
    val date = DateUtils.getTodayDate()
    // 关注四个行为:搜索、点击、下单、支付
    val actions = Array("search", "click", "order", "pay")

    // 一共 100 个用户(有重复)
    for (i <- 0 until 100) {
      val userid = random.nextInt(100)
      // 每个用户产生 10 个 session
      for (j <- 0 until 10) {
        // 不可变的,全局的,独一无二的 128bit 长度的标识符,用于标识一个 session,体现一次会话产生的 sessionId 是独一无二的
        val sessionid = UUID.randomUUID().toString().replace("-", "")
        // 在 yyyy-MM-dd 后面添加一个随机的小时时间(0-23)
        val baseActionTime = date + " " + random.nextInt(23) // 2019-05-30 12
        // 每个 (userid + sessionid) 生成 0-100 条用户访问数据
        for (k <- 0 to random.nextInt(100)) {
          val pageid = random.nextInt(10)
          // 在 yyyy-MM-dd HH 后面添加一个随机的分钟时间和秒时间,2019-05-30 12:25:30
          val actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59)))
          var searchKeyword: String = null
          var clickCategoryId: Long = -1L
          var clickProductId: Long = -1L
          var orderCategoryIds: String = null
          var orderProductIds: String = null
          var payCategoryIds: String = null
          var payProductIds: String = null
          val cityid = random.nextInt(10).toLong

          // 随机确定用户在当前 session 中的行为
          val action = actions(random.nextInt(4))

          // 根据随机产生的用户行为 action 决定对应字段的值
          action match {
            case "search" => searchKeyword = searchKeywords(random.nextInt(10))
            case "click" => clickCategoryId = random.nextInt(100).toLong
              clickProductId = String.valueOf(random.nextInt(100)).toLong
            case "order" => orderCategoryIds = random.nextInt(100).toString
              orderProductIds = random.nextInt(100).toString
            case "pay" => payCategoryIds = random.nextInt(100).toString
              payProductIds = random.nextInt(100).toString
          }

          rows += UserVisitAction(date, userid, sessionid,
            pageid, actionTime, searchKeyword,
            clickCategoryId, clickProductId,
            orderCategoryIds, orderProductIds,
            payCategoryIds, payProductIds, cityid)
        }
      }
    }

    rows.toArray
  }

  /**
    * 模拟用户信息表
    *
    * @return
    */
  private def mockUserInfo(): Array[UserInfo] = {
    val rows = ArrayBuffer[UserInfo]()

    val sexes = Array("male", "female")
    val random = new Random()

    // 随机产生 100 个用户的个人信息
    for (i <- 0 until 100) {
      val userid = i
      val username = "user" + i
      val name = "name" + i
      val age = random.nextInt(60)
      val professional = "professional" + random.nextInt(100)
      val city = "city" + random.nextInt(100)
      val sex = sexes(random.nextInt(2))

      rows += UserInfo(userid, username, name, age, professional, city, sex)
    }

    rows.toArray
  }

  /**
    * 模拟产品数据表
    *
    * @return
    */
  private def mockProductInfo(): Array[ProductInfo] = {
    val rows = ArrayBuffer[ProductInfo]()
    val random = new Random()

    val productStatus = Array(0, 1)

    // 随机产生 100 个产品信息
    for (i <- 0 until 100) {
      val productId = i
      val productName = "product" + i
      val extendInfo = "{\"product_status\": " + productStatus(random.nextInt(2)) + "}" // 注意这里是 json 串

      rows += ProductInfo(productId, productName, extendInfo)
    }

    rows.toArray
  }

  /**
    * 将 DataFrame 插入到 Hive 表中
    *
    * @param spark     SparkSQL 客户端
    * @param tableName 表名
    * @param dataDF    DataFrame
    */
  private def insertHive(spark: SparkSession, tableName: String, dataDF: DataFrame): Unit = {
    spark.sql("DROP TABLE IF EXISTS " + tableName)
    dataDF.write.saveAsTable(tableName)
  }

  val USER_VISIT_ACTION_TABLE = "user_visit_action"
  val USER_INFO_TABLE = "user_info"
  val PRODUCT_INFO_TABLE = "product_info"

  /**
    * 主入口方法
    *
    * @param args 启动参数
    */
  def main(args: Array[String]): Unit = {

    // 创建 Spark 配置
    val sparkConf = new SparkConf().setAppName("MockData").setMaster("local[*]")

    // 创建 Spark SQL 客户端
    val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    // 模拟数据
    val userVisitActionData = this.mockUserVisitActionData()
    val userInfoData = this.mockUserInfo()
    val productInfoData = this.mockProductInfo()

    // 将模拟数据转换为 RDD
    val userVisitActionRdd = spark.sparkContext.makeRDD(userVisitActionData)
    val userInfoRdd = spark.sparkContext.makeRDD(userInfoData)
    val productInfoRdd = spark.sparkContext.makeRDD(productInfoData)

    // 加载 SparkSQL 的隐式转换支持
    import spark.implicits._

    // 将用户访问数据转换为 DF 保存到 Hive 表中
    val userVisitActionDF = userVisitActionRdd.toDF()
    insertHive(spark, USER_VISIT_ACTION_TABLE, userVisitActionDF)

    // 将用户信息数据转换为 DF 保存到 Hive 表中
    val userInfoDF = userInfoRdd.toDF()
    insertHive(spark, USER_INFO_TABLE, userInfoDF)

    // 将产品信息数据转换为 DF 保存到 Hive 表中
    val productInfoDF = productInfoRdd.toDF()
    insertHive(spark, PRODUCT_INFO_TABLE, productInfoDF)

    spark.close
  }
}

MockRealTimeData.scala

代码语言:javascript
复制
import java.util.Properties

import commons.conf.ConfigurationManager
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

object MockRealTimeData {

  def main(args: Array[String]): Unit = {
    // 获取配置文件 commerce.properties 中的 Kafka 配置参数
    val broker = ConfigurationManager.config.getString("kafka.broker.list")
    val topic = ConfigurationManager.config.getString("kafka.topics")

    // 创建 Kafka 生产者
    val kafkaProducer = createKafkaProducer(broker)

    while (true) {
      // 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中
      for (item <- generateMockData()) {
        kafkaProducer.send(new ProducerRecord[String, String](topic, item))
      }
      Thread.sleep(5000)
    }
  }

  /**
    * 实时模拟数据的生成
    *
    * 时间点: 当前时间毫秒
    * userId: 0 - 99
    * 省份、城市 ID 相同: 1 - 9
    * adid: 0 - 19
    * ((0L,"北京","北京"),(1L,"上海","上海"),(2L,"南京","江苏省"),(3L,"广州","广东省"),(4L,"三亚","海南省"),(5L,"武汉","湖北省"),(6L,"长沙","湖南省"),(7L,"西安","陕西省"),(8L,"成都","四川省"),(9L,"哈尔滨","东北省"))
    *
    * 格式 :timestamp province city userid adid
    *       某个时间点 某个省份 某个城市 某个用户 某个广告
    */
  def generateMockData(): Array[String] = {
    val array = ArrayBuffer[String]()
    val random = new Random()

    // 模拟实时数据:timestamp province city userid adid
    for (i <- 0 until 50) {
      val timestamp = System.currentTimeMillis()
      val province = random.nextInt(10)
      val city = province
      val adid = random.nextInt(20)
      val userid = random.nextInt(100)

      // 拼接实时数据
      array += timestamp + " " + province + " " + city + " " + userid + " " + adid
    }

    array.toArray
  }

  def createKafkaProducer(broker: String): KafkaProducer[String, String] = {
    // 创建配置对象
    val prop = new Properties()
    // 添加配置
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    // 根据配置创建 Kafka 生产者
    new KafkaProducer[String, String](prop)
  }
}

4.2 commons 模块(公共模块)

新建一个模块 maven 工程 commons 作为子 maven 工程,引入依赖 pom.xml pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>commerce</artifactId>
        <groupId>com.atguigu</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>commons</artifactId>

    <dependencies>
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-configuration2</artifactId>
            <version>2.2</version>
        </dependency>
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
            <version>1.9.3</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>

        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.9</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

类(包)名称

工具类名称

类(包)结构图

ConfigurationManager.scala

代码语言:javascript
复制
package commons.conf

import org.apache.commons.configuration2.{FileBasedConfiguration, PropertiesConfiguration}
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder
import org.apache.commons.configuration2.builder.fluent.Parameters

/**
  * 配置工具类:新的读取配置文件信息的方式
  */
object ConfigurationManager {
  // 创建用于初始化配置生成器实例的参数对象
  private val params = new Parameters()

  // FileBasedConfigurationBuilder : 产生一个传入的类的实例对象
  // FileBasedConfiguration : 融合 FileBased 与 Configuration 的接口
  // PropertiesConfiguration : 从一个或者多个文件读取配置的标准配置加载器
  // configure() : 通过 params 实例初始化配置生成器
  // 向 FileBasedConfigurationBuilder() 中传入一个标准配置加载器类,生成一个加载器类的实例对象,然后通过 params 参数对其初始化
  private val builder = new FileBasedConfigurationBuilder[FileBasedConfiguration](classOf[PropertiesConfiguration])
    .configure(params.properties().setFileName("commerce.properties"))

  // 通过 getConfiguration 获取配置对象
  val config = builder.getConfiguration()
}

Constants.scala

代码语言:javascript
复制
package commons.constant

/**
  * 常量接口
  */
object Constants {
  /**
    * 项目配置相关的常量
    */
  val JDBC_DATASOURCE_SIZE = "jdbc.datasource.size"
  val JDBC_URL = "jdbc.url"
  val JDBC_USER = "jdbc.user"
  val JDBC_PASSWORD = "jdbc.password"

  val KAFKA_BROKERS = "kafka.broker.list"
  val KAFKA_TOPICS = "kafka.topics"

  /**
    * Spark 作业相关的常量
    */
  val SPARK_APP_NAME_SESSION = "UserVisitSessionAnalyzeSpark"
  val SPARK_APP_NAME_PAGE = "PageOneStepConvertRateSpark"

  /**
    * user_visit_action、user_info、product_info 表中字段对应的字段名常量
    */
  val FIELD_SESSION_ID = "sessionid"
  val FIELD_SEARCH_KEYWORDS = "searchKeywords"
  val FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds"
  val FIELD_AGE = "age"
  val FIELD_PROFESSIONAL = "professional"
  val FIELD_CITY = "city"
  val FIELD_SEX = "sex"
  val FIELD_VISIT_LENGTH = "visitLength"
  val FIELD_STEP_LENGTH = "stepLength"
  val FIELD_START_TIME = "startTime"
  val FIELD_CLICK_COUNT = "clickCount"
  val FIELD_ORDER_COUNT = "orderCount"
  val FIELD_PAY_COUNT = "payCount"
  val FIELD_CATEGORY_ID = "categoryId"

  /**
    * Spark 累加器 Key 名称常量
    */
  val SESSION_COUNT = "session_count"

  val TIME_PERIOD_1s_3s = "1s_3s"
  val TIME_PERIOD_4s_6s = "4s_6s"
  val TIME_PERIOD_7s_9s = "7s_9s"
  val TIME_PERIOD_10s_30s = "10s_30s"
  val TIME_PERIOD_30s_60s = "30s_60s"
  val TIME_PERIOD_1m_3m = "1m_3m"
  val TIME_PERIOD_3m_10m = "3m_10m"
  val TIME_PERIOD_10m_30m = "10m_30m"
  val TIME_PERIOD_30m = "30m"

  val STEP_PERIOD_1_3 = "1_3"
  val STEP_PERIOD_4_6 = "4_6"
  val STEP_PERIOD_7_9 = "7_9"
  val STEP_PERIOD_10_30 = "10_30"
  val STEP_PERIOD_30_60 = "30_60"
  val STEP_PERIOD_60 = "60"

  /**
    * task.params.json 中限制条件对应的常量字段
    */
  val TASK_PARAMS = "task.params.json"

  val PARAM_START_DATE = "startDate"
  val PARAM_END_DATE = "endDate"
  val PARAM_START_AGE = "startAge"
  val PARAM_END_AGE = "endAge"
  val PARAM_PROFESSIONALS = "professionals"
  val PARAM_CITIES = "cities"
  val PARAM_SEX = "sex"
  val PARAM_KEYWORDS = "keywords"
  val PARAM_CATEGORY_IDS = "categoryIds"
  val PARAM_TARGET_PAGE_FLOW = "targetPageFlow"
}

DataModel.scala

代码语言:javascript
复制
package commons.model

//***************** 输入表 *********************

/**
  * 用户访问动作表
  *
  * @param date               用户点击行为的日期
  * @param user_id            用户的 ID
  * @param session_id         Session 的 ID
  * @param page_id            某个页面的 ID
  * @param action_time        点击行为的时间点
  * @param search_keyword     用户搜索的关键词
  * @param click_category_id  某一个商品品类的 ID
  * @param click_product_id   某一个商品的 ID
  * @param order_category_ids 一次订单中所有品类的 ID 集合
  * @param order_product_ids  一次订单中所有商品的 ID 集合
  * @param pay_category_ids   一次支付中所有品类的 ID 集合
  * @param pay_product_ids    一次支付中所有商品的 ID 集合
  * @param city_id            城市 ID
  */
case class UserVisitAction(date: String,
                           user_id: Long,
                           session_id: String,
                           page_id: Long,
                           action_time: String,
                           search_keyword: String,
                           click_category_id: Long,
                           click_product_id: Long,
                           order_category_ids: String,
                           order_product_ids: String,
                           pay_category_ids: String,
                           pay_product_ids: String,
                           city_id: Long)

/**
  * 用户信息表
  *
  * @param user_id      用户的 ID
  * @param username     用户的名称
  * @param name         用户的名字
  * @param age          用户的年龄
  * @param professional 用户的职业
  * @param city         用户所在的城市
  * @param sex          用户的性别
  */
case class UserInfo(user_id: Long,
                    username: String,
                    name: String,
                    age: Int,
                    professional: String,
                    city: String,
                    sex: String)

/**
  * 产品表
  *
  * @param product_id   商品的 ID
  * @param product_name 商品的名称
  * @param extend_info  商品额外的信息
  */
case class ProductInfo(product_id: Long,
                       product_name: String,
                       extend_info: String)

PooledMySqlClientFactory.scala

代码语言:javascript
复制
package commons.pool

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import commons.conf.ConfigurationManager
import commons.constant.Constants
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool, GenericObjectPoolConfig}
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}

// 创建用于处理 MySQL 查询结果的类的抽象接口
trait QueryCallback {
  def process(rs: ResultSet)
}

/**
  * MySQL 客户端代理对象
  *
  * @param jdbcUrl      MySQL URL
  * @param jdbcUser     MySQL 用户
  * @param jdbcPassword MySQL 密码
  * @param client       默认客户端实现
  */
case class MySqlProxy(jdbcUrl: String, jdbcUser: String, jdbcPassword: String, client: Option[Connection] = None) {
  // 获取客户端连接对象
  private val mysqlClient = client getOrElse {
    DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword)
  }

  /**
    * 执行增删改 SQL 语句
    *
    * @param sql
    * @param params
    * @return 影响的行数
    */
  def executeUpdate(sql: String, params: Array[Any]): Int = {
    var rtn = 0
    var pstmt: PreparedStatement = null

    try {
      // 第一步:关闭自动提交
      mysqlClient.setAutoCommit(false)

      // 第二步:根据传入的 sql 语句创建 prepareStatement
      pstmt = mysqlClient.prepareStatement(sql)

      // 第三步:为 prepareStatement 中的每个参数填写数值
      if (params != null && params.length > 0) {
        for (i <- 0 until params.length) {
          pstmt.setObject(i + 1, params(i))
        }
      }

      // 第四步:执行增删改操作
      rtn = pstmt.executeUpdate()

      // 第五步:手动提交
      mysqlClient.commit()
    } catch {
      case e: Exception => e.printStackTrace
    }

    rtn
  }

  /**
    * 执行查询 SQL 语句
    *
    * @param sql
    * @param params
    */
  def executeQuery(sql: String, params: Array[Any], queryCallback: QueryCallback) {
    var pstmt: PreparedStatement = null
    var rs: ResultSet = null

    try {
      // 第一步:根据传入的 sql 语句创建 prepareStatement
      pstmt = mysqlClient.prepareStatement(sql)

      // 第二步:为 prepareStatement 中的每个参数填写数值
      if (params != null && params.length > 0) {
        for (i <- 0 until params.length) {
          pstmt.setObject(i + 1, params(i))
        }
      }

      // 第三步:执行查询操作
      rs = pstmt.executeQuery()

      // 第四步:处理查询后的结果
      queryCallback.process(rs)
    } catch {
      case e: Exception => e.printStackTrace
    }
  }

  /**
    * 批量执行 SQL 语句
    *
    * @param sql
    * @param paramsList
    * @return 每条SQL语句影响的行数
    */
  def executeBatch(sql: String, paramsList: Array[Array[Any]]): Array[Int] = {
    var rtn: Array[Int] = null
    var pstmt: PreparedStatement = null
    try {
      // 第一步:关闭自动提交
      mysqlClient.setAutoCommit(false)

      pstmt = mysqlClient.prepareStatement(sql)

      // 第二步:为 prepareStatement 中的每个参数填写数值
      if (paramsList != null && paramsList.length > 0) {
        for (params <- paramsList) {
          for (i <- 0 until params.length) {
            pstmt.setObject(i + 1, params(i))
          }
          pstmt.addBatch()
        }
      }

      // 第三步:执行批量的 SQL 语句
      rtn = pstmt.executeBatch()

      // 第四步:手动提交
      mysqlClient.commit()
    } catch {
      case e: Exception => e.printStackTrace
    }

    rtn
  }

  // 关闭 MySQL 客户端
  def shutdown(): Unit = mysqlClient.close()
}

/**
  * 扩展知识:将 MySqlProxy 实例视为对象,MySqlProxy 实例的创建使用对象池进行维护
  *
  * 创建自定义工厂类,继承 BasePooledObjectFactory 工厂类,负责对象的创建、包装和销毁
  *
  * @param jdbcUrl
  * @param jdbcUser
  * @param jdbcPassword
  * @param client
  */
class PooledMySqlClientFactory(jdbcUrl: String, jdbcUser: String, jdbcPassword: String, client: Option[Connection] = None)
  extends BasePooledObjectFactory[MySqlProxy] with Serializable {
  // 用于池来创建对象
  override def create(): MySqlProxy = MySqlProxy(jdbcUrl, jdbcUser, jdbcPassword, client)

  // 用于池来包装对象
  override def wrap(obj: MySqlProxy): PooledObject[MySqlProxy] = new DefaultPooledObject(obj)

  // 用于池来销毁对象
  override def destroyObject(p: PooledObject[MySqlProxy]): Unit = {
    p.getObject.shutdown()
    super.destroyObject(p)
  }
}

/**
  * 创建 MySQL 池工具类
  */
object CreateMySqlPool {
  // 加载 JDBC 驱动,只需要一次
  Class.forName("com.mysql.jdbc.Driver")

  // 在 org.apache.commons.pool2.impl 中预设了三个可以直接使用的对象池:GenericObjectPool、GenericKeyedObjectPool 和 SoftReferenceObjectPool
  // 创建 genericObjectPool 为 GenericObjectPool
  // GenericObjectPool 的特点是可以设置对象池中的对象特征,包括 LIFO 方式、最大空闲数、最小空闲数、是否有效性检查等等
  private var genericObjectPool: GenericObjectPool[MySqlProxy] = null

  // 伴生对象通过 apply 完成对象的创建
  def apply(): GenericObjectPool[MySqlProxy] = {
    // 单例模式
    if (this.genericObjectPool == null) {
      this.synchronized {
        // 获取 MySQL 配置参数
        val jdbcUrl = ConfigurationManager.config.getString(Constants.JDBC_URL)
        val jdbcUser = ConfigurationManager.config.getString(Constants.JDBC_USER)
        val jdbcPassword = ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)
        val size = ConfigurationManager.config.getInt(Constants.JDBC_DATASOURCE_SIZE)

        val pooledFactory = new PooledMySqlClientFactory(jdbcUrl, jdbcUser, jdbcPassword)
        val poolConfig = {
          // 创建标准对象池配置类的实例
          val c = new GenericObjectPoolConfig
          // 设置配置对象参数
          // 设置最大对象数
          c.setMaxTotal(size)
          // 设置最大空闲对象数
          c.setMaxIdle(size)
          c
        }

        // 对象池的创建需要工厂类和配置类
        // 返回一个 GenericObjectPool 对象池
        this.genericObjectPool = new GenericObjectPool[MySqlProxy](pooledFactory, poolConfig)
      }
    }

    genericObjectPool
  }
}

Utils.scala

代码语言:javascript
复制
package commons.utils

import java.util.Date

import net.sf.json.JSONObject
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat

import scala.collection.mutable

/**
  * 日期时间工具类
  * 使用 joda 实现,如果使用 Java 提供的 Date 会存在线程安全问题
  */
object DateUtils {
  val DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd")
  val TIME_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")

  val DATE_KEY_FORMAT = DateTimeFormat.forPattern("yyyyMMdd")
  val DATE_TIME_FORMAT = DateTimeFormat.forPattern("yyyyMMddHHmm")

  /**
    * 判断一个时间是否在另一个时间之前
    *
    * @param time1 第一个时间
    * @param time2 第二个时间
    * @return 判断结果
    */
  def before(time1: String, time2: String): Boolean = {
    if (TIME_FORMAT.parseDateTime(time1).isBefore(TIME_FORMAT.parseDateTime(time2))) {
      return true
    }
    false
  }

  /**
    * 判断一个时间是否在另一个时间之后
    *
    * @param time1 第一个时间
    * @param time2 第二个时间
    * @return 判断结果
    */
  def after(time1: String, time2: String): Boolean = {
    if (TIME_FORMAT.parseDateTime(time1).isAfter(TIME_FORMAT.parseDateTime(time2))) {
      return true
    }
    false
  }

  /**
    * 计算时间差值(单位为秒)
    *
    * @param time1 时间1
    * @param time2 时间2
    * @return 差值
    */
  def minus(time1: String, time2: String): Int = {
    return (TIME_FORMAT.parseDateTime(time1).getMillis - TIME_FORMAT.parseDateTime(time2).getMillis) / 1000 toInt
  }

  /**
    * 获取年月日和小时
    *
    * @param datetime 时间(yyyy-MM-dd HH:mm:ss)
    * @return 结果(yyyy-MM-dd_HH)
    */
  def getDateHour(datetime: String): String = {
    val date = datetime.split(" ")(0)
    val hourMinuteSecond = datetime.split(" ")(1)
    val hour = hourMinuteSecond.split(":")(0)
    date + "_" + hour
  }

  /**
    * 获取当天日期(yyyy-MM-dd)
    *
    * @return 当天日期
    */
  def getTodayDate(): String = {
    DateTime.now().toString(DATE_FORMAT)
  }

  /**
    * 获取昨天的日期(yyyy-MM-dd)
    *
    * @return 昨天的日期
    */
  def getYesterdayDate(): String = {
    DateTime.now().minusDays(1).toString(DATE_FORMAT)
  }

  /**
    * 格式化日期(yyyy-MM-dd)
    *
    * @param date Date对象
    * @return 格式化后的日期
    */
  def formatDate(date: Date): String = {
    new DateTime(date).toString(DATE_FORMAT)
  }

  /**
    * 格式化时间(yyyy-MM-dd HH:mm:ss)
    *
    * @param date Date对象
    * @return 格式化后的时间
    */
  def formatTime(date: Date): String = {
    new DateTime(date).toString(TIME_FORMAT)
  }

  /**
    * 解析时间字符串
    *
    * @param time 时间字符串
    * @return Date
    */
  def parseTime(time: String): Date = {
    TIME_FORMAT.parseDateTime(time).toDate
  }

  def main(args: Array[String]): Unit = {
    print(DateUtils.parseTime("2017-10-31 20:27:53")) // Tue Oct 31 20:27:53 CST 2017
  }

  /**
    * 格式化日期 key
    * yyyyMMdd
    *
    * @param date
    * @return
    */
  def formatDateKey(date: Date): String = {
    new DateTime(date).toString(DATE_KEY_FORMAT)
  }

  /**
    * 解析日期 key
    *
    * @return
    */
  def parseDateKey(datekey: String): Date = {
    DATE_KEY_FORMAT.parseDateTime(datekey).toDate
  }

  /**
    * 格式化时间,保留到分钟级别
    * yyyyMMddHHmm
    *
    * @param date
    * @return
    */
  def formatTimeMinute(date: Date): String = {
    new DateTime(date).toString(DATE_TIME_FORMAT)
  }
}

/**
  * 数字格式化工具类
  */
object NumberUtils {
  /**
    * 格式化小数
    *
    * @param scale 四舍五入的位数
    * @return 格式化小数
    */
  def formatDouble(num: Double, scale: Int): Double = {
    val bd = BigDecimal(num)
    bd.setScale(scale, BigDecimal.RoundingMode.HALF_UP).doubleValue()
  }
}

/**
  * 参数工具类
  */
object ParamUtils {
  /**
    * 从 JSON 对象中提取参数
    *
    * @param jsonObject JSON对象
    * @return 参数
    */
  def getParam(jsonObject: JSONObject, field: String): String = {
    jsonObject.getString(field)
  }
}

/**
  * 字符串工具类
  *
  */
object StringUtils {
  /**
    * 判断字符串是否为空
    *
    * @param str 字符串
    * @return 是否为空
    */
  def isEmpty(str: String): Boolean = {
    str == null || "".equals(str)
  }

  /**
    * 判断字符串是否不为空
    *
    * @param str 字符串
    * @return 是否不为空
    */
  def isNotEmpty(str: String): Boolean = {
    str != null && !"".equals(str)
  }

  /**
    * 截断字符串两侧的逗号
    *
    * @param str 字符串
    * @return 字符串
    */
  def trimComma(str: String): String = {
    var result = ""
    if (str.startsWith(",")) {
      result = str.substring(1)
    }
    if (str.endsWith(",")) {
      result = str.substring(0, str.length() - 1)
    }
    result
  }

  /**
    * 补全两位数字
    *
    * @param str
    * @return
    */
  def fulfuill(str: String): String = {
    if (str.length() == 2) {
      str
    } else {
      "0" + str
    }
  }

  /**
    * 从拼接的字符串中提取字段
    *
    * @param str       字符串
    * @param delimiter 分隔符
    * @param field     字段
    * @return 字段值
    */
  def getFieldFromConcatString(str: String, delimiter: String, field: String): String = {
    try {
      val fields = str.split(delimiter);
      for (concatField <- fields) {
        if (concatField.split("=").length == 2) {
          val fieldName = concatField.split("=")(0)
          val fieldValue = concatField.split("=")(1)
          if (fieldName.equals(field)) {
            return fieldValue
          }
        }
      }
    } catch {
      case e: Exception => e.printStackTrace()
    }

    null
  }

  /**
    * 从拼接的字符串中给字段设置值
    *
    * @param str           字符串
    * @param delimiter     分隔符
    * @param field         字段名
    * @param newFieldValue 新的field值
    * @return 字段值
    */
  def setFieldInConcatString(str: String, delimiter: String, field: String, newFieldValue: String): String = {
    val fieldsMap = new mutable.HashMap[String, String]()

    for (fileds <- str.split(delimiter)) {
      val arra = fileds.split("=")
      if (arra(0).compareTo(field) == 0)
        fieldsMap += (field -> newFieldValue)
      else
        fieldsMap += (arra(0) -> arra(1))
    }

    fieldsMap.map(item => item._1 + "=" + item._2).mkString(delimiter)
  }
}

/**
  * 校验工具类
  */
object ValidUtils {
  /**
    * 校验数据中的指定字段,是否在指定范围内(范围区间)
    *
    * @param data            数据
    * @param dataField       数据字段
    * @param parameter       参数
    * @param startParamField 起始参数字段
    * @param endParamField   结束参数字段
    * @return 校验结果
    */
  def between(data: String, dataField: String, parameter: String, startParamField: String, endParamField: String): Boolean = {
    val startParamFieldStr = StringUtils.getFieldFromConcatString(parameter, "\\|", startParamField)
    val endParamFieldStr = StringUtils.getFieldFromConcatString(parameter, "\\|", endParamField)
    if (startParamFieldStr == null || endParamFieldStr == null) {
      return true
    }

    val startParamFieldValue = startParamFieldStr.toInt
    val endParamFieldValue = endParamFieldStr.toInt

    val dataFieldStr = StringUtils.getFieldFromConcatString(data, "\\|", dataField)
    if (dataFieldStr != null) {
      val dataFieldValue = dataFieldStr.toInt
      if (dataFieldValue >= startParamFieldValue && dataFieldValue <= endParamFieldValue) {
        return true
      } else {
        return false
      }
    }

    false
  }

  /**
    * 校验数据中的指定字段,是否有值与参数字段的值相同(多选一)
    *
    * @param data       数据
    * @param dataField  数据字段
    * @param parameter  参数
    * @param paramField 参数字段
    * @return 校验结果
    */
  def in(data: String, dataField: String, parameter: String, paramField: String): Boolean = {
    val paramFieldValue = StringUtils.getFieldFromConcatString(parameter, "\\|", paramField)
    if (paramFieldValue == null) {
      return true
    }
    val paramFieldValueSplited = paramFieldValue.split(",")

    val dataFieldValue = StringUtils.getFieldFromConcatString(data, "\\|", dataField)
    if (dataFieldValue != null && dataFieldValue != "-1") {
      val dataFieldValueSplited = dataFieldValue.split(",")

      for (singleDataFieldValue <- dataFieldValueSplited) {
        for (singleParamFieldValue <- paramFieldValueSplited) {
          if (singleDataFieldValue.compareTo(singleParamFieldValue) == 0) {
            return true
          }
        }
      }
    }

    false
  }

  /**
    * 校验数据中的指定字段,是否在指定范围内(二选一)
    *
    * @param data       数据
    * @param dataField  数据字段
    * @param parameter  参数
    * @param paramField 参数字段
    * @return 校验结果
    */
  def equal(data: String, dataField: String, parameter: String, paramField: String): Boolean = {
    val paramFieldValue = StringUtils.getFieldFromConcatString(parameter, "\\|", paramField)
    if (paramFieldValue == null) {
      return true
    }

    val dataFieldValue = StringUtils.getFieldFromConcatString(data, "\\|", dataField)
    if (dataFieldValue != null) {
      if (dataFieldValue.compareTo(paramFieldValue) == 0) {
        return true
      }
    }

    false
  }
}

commerce.properties

代码语言:javascript
复制
# jbdc 配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://localhost:3306/commerce?useUnicode=true&characterEncoding=utf8
jdbc.user=root
jdbc.password=root

# 筛选条件的配置
# 可以使用的属性如下:
#      startDate:       格式: yyyy-MM-DD   [必选]
#      endDate:         格式: yyyy-MM-DD   [必选]
#      startAge:        范围: 0 - 59
#      endAge:          范围: 0 - 59
#      professionals:   范围:professionals[0 - 99]
#      cities:          0 - 99  ((0,"北京","华北"),(1,"上海","华东"),(2,"南京","华东"),(3,"广州","华南"),(4,"三亚","华南"),(5,"武汉","华中"),(6,"长沙","华中"),(7,"西安","西北"),(8,"成都","西南"),(9,"哈尔滨","东北"),...)
#      sex:             范围: 0 - 1
#      keywords:        范围: ("火锅", "蛋糕", "重庆辣子鸡", "重庆小面", "呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉")
#      categoryIds:     0 - 99,以逗号分隔
#      targetPageFlow:  0 - 99, 以逗号分隔
task.params.json={\
  startDate:"2019-06-01", \
  endDate:"2019-06-30", \
  startAge: 20, \
  endAge: 50, \
  professionals: "",  \
  cities: "", \
  sex:"", \
  keywords:"", \
  categoryIds:"", \
  targetPageFlow:"1,2,3,4,5,6,7"}

# Kafka 配置
kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka.topics=AdRealTimeLog1

log4j.properties

代码语言:javascript
复制
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n

log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1

log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n

4.3 analyse 模块(数据分析模块)

新建一个模块 maven 工程 analyse 作为子 maven 工程,删除掉 src 目录,引入依赖 pom.xml,添加对 scala 框架的支持。 注意:在该子模块中有很多子模块。即具体需求实现的模块。 pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>commerce</artifactId>
        <groupId>com.atguigu</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>analyse</artifactId>


</project>

analyse 模块是需求的具体实现模块, 我们将会在第 5 章中进行详细解析。

第5章 需求解析

5.1 需求一:Session 各范围访问步长、访问时长占比统计

5.1.1 需求解析

  需求一:要统计出符合筛选条件的 session 中,访问时长在 1s~3s、4s~6s、7s~9s、10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m 以上各个范围内的 session 占比;访问步长在 1~3、4~6、7~9、10~30、30~60、60 以上各个范围内的 session 占比,并将结果保存到 MySQL 数据库中。   在计算之前需要根据查询条件筛选 session,查询条件比如搜索过某些关键词的用户、访问时间在某个时间段内的用户、年龄在某个范围内的用户、职业在某个范围内的用户、所在某个城市的用户,发起的 session。找到对应的这些用户的 session,并进行统计,之所以需要有筛选主要是可以让使用者,对感兴趣的和关系的用户群体,进行后续各种复杂业务逻辑的统计和分析,那么拿到的结果数据,就是只是针对特殊用户群体的分析结果;而不是对所有用户进行分析的泛泛的分析结果。比如说,现在某个企业高层,就是想看到用户群体中,28~35 岁的老师职业的群体,对应的一些统计和分析的结果数据,从而辅助高管进行公司战略上的决策制定。   session 访问时长,也就是说一个 session 对应的开始的 action 到结束的 action 之间的时间范围;还有,就是访问步长,指的是,一个 session 执行期间内,依次点击过多少个页面,比如说,一次 session 维持了 1 分钟,那么访问时长就是 1m,然后在这 1 分钟内,点击了 10 个页面,那么 session 的访问步长,就是 10。   比如说,符合第一步筛选出来的 session 的数量大概是有 1000 万个。那么里面,我们要计算出,访问时长在 1s~3s 内的 session 的数量,并除以符合条件的总 session 数量(比如 1000 万),比如是 100 万/1000 万,那么 1s~3s 内的 session 占比就是 10%。依次类推,这里说的统计,就是这个意思。   这个功能可以让人从全局的角度看到,符合某些条件的用户群体使用我们的产品的一些习惯。比如大多数人,到底是会在产品中停留多长时间,大多数人,会在一次使用产品的过程中,访问多少个页面。那么对于使用者来说, 有一个全局和清晰的认识。

5.1.2 数据源解析
5.1.3 数据结构解析

1、UserVisitAction 样例类

代码语言:javascript
复制
/**
  * 用户访问动作表
  *
  * @param date               用户点击行为的日期
  * @param user_id            用户的 ID
  * @param session_id         Session 的 ID
  * @param page_id            某个页面的 ID
  * @param action_time        点击行为的时间点
  * @param search_keyword     用户搜索的关键词
  * @param click_category_id  某一个商品品类的 ID
  * @param click_product_id   某一个商品的 ID
  * @param order_category_ids 一次订单中所有品类的 ID 集合
  * @param order_product_ids  一次订单中所有商品的 ID 集合
  * @param pay_category_ids   一次支付中所有品类的 ID 集合
  * @param pay_product_ids    一次支付中所有商品的 ID 集合
  * @param city_id            城市 ID
  */
case class UserVisitAction(date: String,
                           user_id: Long,
                           session_id: String,
                           page_id: Long,
                           action_time: String,
                           search_keyword: String,
                           click_category_id: Long,
                           click_product_id: Long,
                           order_category_ids: String,
                           order_product_ids: String,
                           pay_category_ids: String,
                           pay_product_ids: String,
                           city_id: Long
                          )

2、UserInfo 样例类

代码语言:javascript
复制
/**
  * 用户信息表
  *
  * @param user_id      用户的 ID
  * @param username     用户的名称
  * @param name         用户的名字
  * @param age          用户的年龄
  * @param professional 用户的职业
  * @param city         用户所在的城市
  * @param sex          用户的性别
  */
case class UserInfo(user_id: Long,
                    username: String,
                    name: String,
                    age: Int,
                    professional: String,
                    city: String,
                    sex: String
                   )

为什么联立用户表?   用户表中记录了用户详细的个人信息,包括年龄、职业、城市、性别等,在实际的业务场景中,我们可能会在一段时间关注某一个群体的用户的行为,比如在某一段时间关注北京的白领们的购物行为,那么我们就可以通过联立用户表,让我们的统计数据中具有用户属性,然后根据用户属性对统计信息进行过滤,将不属于我们所关注的用户群体的用户所产生的行为数据过滤掉,这样就可以实现对指定人群的精准分析。

5.1.4 需求实现简要流程
5.1.5 需求实现详细流程
5.1.6 MySQL 存储结构解析

MySQL 写入数据格式 session_aggr_stat

代码语言:javascript
复制
-- ----------------------------
--  Table structure for `session_aggr_stat`
-- ----------------------------
DROP TABLE IF EXISTS `session_aggr_stat`;
CREATE TABLE `session_aggr_stat` (
  `taskid` varchar(255) DEFAULT NULL,
  `session_count` int(11) DEFAULT NULL,
  `visit_length_1s_3s_ratio` double DEFAULT NULL,
  `visit_length_4s_6s_ratio` double DEFAULT NULL,
  `visit_length_7s_9s_ratio` double DEFAULT NULL,
  `visit_length_10s_30s_ratio` double DEFAULT NULL,
  `visit_length_30s_60s_ratio` double DEFAULT NULL,
  `visit_length_1m_3m_ratio` double DEFAULT NULL,
  `visit_length_3m_10m_ratio` double DEFAULT NULL,
  `visit_length_10m_30m_ratio` double DEFAULT NULL,
  `visit_length_30m_ratio` double DEFAULT NULL,
  `step_length_1_3_ratio` double DEFAULT NULL,
  `step_length_4_6_ratio` double DEFAULT NULL,
  `step_length_7_9_ratio` double DEFAULT NULL,
  `step_length_10_30_ratio` double DEFAULT NULL,
  `step_length_30_60_ratio` double DEFAULT NULL,
  `step_length_60_ratio` double DEFAULT NULL,
  KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.1.7 代码解析

在模块 analyse 新建一个模块 session,引入 pom 文件,修改 src 目录名称为 scala,同时添加 scala 框架的支持。

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>analyse</artifactId>
        <groupId>com.atguigu</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>session</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.atguigu</groupId>
            <artifactId>commons</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <!-- Spark 的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
        </dependency>
        <!-- 引入 Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <!-- scala-maven-plugin 插件用于在任意的 maven 项目中对 scala 代码进行编译/测试/运行/文档化 -->
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.atguigu.session.UserVisitSessionAnalyze</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

代码实现示例如下: SessionStat.scala

代码语言:javascript
复制
package com.atguigu.session

import java.util.{Date, UUID}

import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.{UserInfo, UserVisitAction}
import commons.utils._
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.mutable

object SessionStat {

  def main(args: Array[String]): Unit = {
    // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
    val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
    // 获取过滤条件对应的 JsonObject 对象
    val taskParam = JSONObject.fromObject(jsonStr)

    // 创建全局唯一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不同任务,作为写入 MySQL 数据库中那张表的主键
    val taskUUID = UUID.randomUUID().toString

    // 创建 sparkConf
    val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")

    // 创建 sparkSession(包含 sparkContext)
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    // 获取原始的动作表数据(带有过滤条件)
    // actionRDD: RDD[UserVisitAction]
    val actionRDD = getOriActionRDD(sparkSession, taskParam)

    // 将用户行为信息转换为 K-V 结构,sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
    val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))

    // session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
    val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一个 sessionId 的数据聚合到一起,得到斧子形数据

    // 将数据进行内存缓存
    session2GroupActionRDD.cache()

    // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
    val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)

    // 创建自定义累加器对象
    val sessionStatisticAccumulator = new SessionStatisticAccumulator

    // 在 sparkSession 中注册自定义累加器,这样后面就可以用了
    sparkSession.sparkContext.register(sessionStatisticAccumulator)

    // 根据过滤条件对 sessionId2FullAggrInfoRDD 进行过滤操作,即过滤掉不符合条件的数据,并根据自定义累加器 统计不同范围的 访问时长 和 访问步长 的 session 个数 以及 总的 session 个数
    val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)

    // 必须引入任意一个 action 的算子,才能启动
    seeionId2FilterRDD.foreach(println(_))

    // 计算各个 session 的占比
    getSessionRatio(sparkSession,taskUUID, sessionStatisticAccumulator.value)
  }

  def getSessionRatio(sparkSession: SparkSession, taskUUID: String, value: mutable.HashMap[String, Int]): Unit = {
    val session_count = value.getOrElse(Constants.SESSION_COUNT, 1).toDouble

    // 先获取各个值
    val visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s, 0)
    val visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s, 0)
    val visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s, 0)
    val visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s, 0)
    val visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s, 0)
    val visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m, 0)
    val visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m, 0)
    val visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m, 0)
    val visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m, 0)

    val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3, 0)
    val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6, 0)
    val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9, 0)
    val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30, 0)
    val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60, 0)
    val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60, 0)

    // 计算比例
    val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s / session_count, 2)
    val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s / session_count, 2)
    val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s / session_count, 2)
    val visit_length_10s_30s_ratio = NumberUtils.formatDouble(visit_length_10s_30s / session_count, 2)
    val visit_length_30s_60s_ratio = NumberUtils.formatDouble(visit_length_30s_60s / session_count, 2)
    val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m / session_count, 2)
    val visit_length_3m_10m_ratio = NumberUtils.formatDouble(visit_length_3m_10m / session_count, 2)
    val visit_length_10m_30m_ratio = NumberUtils.formatDouble(visit_length_10m_30m / session_count, 2)
    val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m / session_count, 2)

    val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3 / session_count, 2)
    val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6 / session_count, 2)
    val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9 / session_count, 2)
    val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30 / session_count, 2)
    val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60 / session_count, 2)
    val step_length_60_ratio = NumberUtils.formatDouble(step_length_60 / session_count, 2)

    // 封装数据
    val stat = SessionAggrStat(taskUUID, session_count.toInt,
      visit_length_1s_3s_ratio, visit_length_4s_6s_ratio, visit_length_7s_9s_ratio,
      visit_length_10s_30s_ratio, visit_length_30s_60s_ratio, visit_length_1m_3m_ratio,
      visit_length_3m_10m_ratio, visit_length_10m_30m_ratio, visit_length_30m_ratio,
      step_length_1_3_ratio, step_length_4_6_ratio, step_length_7_9_ratio,
      step_length_10_30_ratio, step_length_30_60_ratio, step_length_60_ratio)

    // 样例类实例 -> 数组 -> RDD
    val sessionRatioRDD = sparkSession.sparkContext.makeRDD(Array(stat))

    // 写入 MySQL 数据库中
    import sparkSession.implicits._
    sessionRatioRDD.toDF().write
      .format("jdbc")
      .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .option("dbtable", "session_aggr_stat")
      .mode(SaveMode.Append) // 表存在就追加,表不存在就新建
      .save()
  }

  def calculateVisitLength(visitLength: Long, sessionStatisticAccumulator: SessionStatisticAccumulator) = {
    if (visitLength >= 1 && visitLength <= 3) {
      sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1s_3s)
    } else if (visitLength >= 4 && visitLength <= 6) {
      sessionStatisticAccumulator.add(Constants.TIME_PERIOD_4s_6s)
    } else if (visitLength >= 7 && visitLength <= 9) {
      sessionStatisticAccumulator.add(Constants.TIME_PERIOD_7s_9s)
    } else if (visitLength >= 10 && visitLength <= 30) {
      sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10s_30s)
    } else if (visitLength > 30 && visitLength <= 60) {
      sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30s_60s)
    } else if (visitLength > 60 && visitLength <= 180) {
      sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1m_3m)
    } else if (visitLength > 180 && visitLength <= 600) {
      sessionStatisticAccumulator.add(Constants.TIME_PERIOD_3m_10m)
    } else if (visitLength > 600 && visitLength <= 1800) {
      sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10m_30m)
    } else if (visitLength > 1800) {
      sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30m)
    }
  }

  def calculateStepLength(stepLength: Long, sessionStatisticAccumulator: SessionStatisticAccumulator) = {
    if (stepLength >= 1 && stepLength <= 3) {
      sessionStatisticAccumulator.add(Constants.STEP_PERIOD_1_3)
    } else if (stepLength >= 4 && stepLength <= 6) {
      sessionStatisticAccumulator.add(Constants.STEP_PERIOD_4_6)
    } else if (stepLength >= 7 && stepLength <= 9) {
      sessionStatisticAccumulator.add(Constants.STEP_PERIOD_7_9)
    } else if (stepLength >= 10 && stepLength <= 30) {
      sessionStatisticAccumulator.add(Constants.STEP_PERIOD_10_30)
    } else if (stepLength > 30 && stepLength <= 60) {
      sessionStatisticAccumulator.add(Constants.STEP_PERIOD_30_60)
    } else if (stepLength > 60) {
      sessionStatisticAccumulator.add(Constants.STEP_PERIOD_60)
    }
  }

  def getSessionFilterRDD(taskParam: JSONObject,
                          sessionId2FullAggrInfoRDD: RDD[(String, String)],
                          sessionStatisticAccumulator: SessionStatisticAccumulator) = {
    // 先获取所用到的过滤条件:
    val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE)
    val endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE)
    val professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS)
    val cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES)
    val sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX)
    val keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS)
    val categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS)

    // 拼装过滤条件的字符串:
    var filterInfo = (if (startAge != null) Constants.PARAM_START_AGE + "=" + startAge + "|" else "") +
      (if (endAge != null) Constants.PARAM_END_AGE + "=" + endAge + "|" else "") +
      (if (professionals != null) Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" else "") +
      (if (cities != null) Constants.PARAM_CITIES + "=" + cities + "|" else "") +
      (if (sex != null) Constants.PARAM_SEX + "=" + sex + "|" else "") +
      (if (keywords != null) Constants.PARAM_KEYWORDS + "=" + keywords + "|" else "") +
      (if (categoryIds != null) Constants.PARAM_CATEGORY_IDS + "=" + categoryIds else "")

    // 去除过滤条件字符串末尾的 "|"
    if (filterInfo.endsWith("\\|"))
      filterInfo = filterInfo.substring(0, filterInfo.length - 1)

    // 进行过滤操作(过滤自带遍历功能)
    sessionId2FullAggrInfoRDD.filter {
      case (sessionId, fullAggrInfo) =>
        var success = true

        // 如果 age 不在过滤条件范围之内,则当前 sessionId 对应的 fullAggrInfo 数据被过滤掉
        if (!ValidUtils.between(fullAggrInfo, Constants.FIELD_AGE, filterInfo, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) { // 范围用 between
          success = false
        } else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_PROFESSIONAL, filterInfo, Constants.PARAM_PROFESSIONALS)) {
          success = false
        } else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_CITY, filterInfo, Constants.PARAM_CITIES)) {
          success = false
        } else if (!ValidUtils.equal(fullAggrInfo, Constants.FIELD_SEX, filterInfo, Constants.PARAM_SEX)) { // 二选一用 equal
          success = false
        } else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_SEARCH_KEYWORDS, filterInfo, Constants.PARAM_KEYWORDS)) { // 多选一用 in
          success = false
        } else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS, filterInfo, Constants.PARAM_CATEGORY_IDS)) {
          success = false
        }

        // 自定义累加器,统计不同范围的 访问时长 和 访问步长 的个数 以及 总的 session 个数
        if (success) {
          sessionStatisticAccumulator.add(Constants.SESSION_COUNT) // 总的 session 个数

          // 获取当前 sessionId 对应的 访问时长 和 访问步长
          val visitLength = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLong
          val stepLength = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_STEP_LENGTH).toLong

          // 统计不同范围的 访问时长 和 访问步长 的个数
          calculateVisitLength(visitLength, sessionStatisticAccumulator)
          calculateStepLength(stepLength, sessionStatisticAccumulator)
        }

        success
    }
  }

  def getSessionFullAggrInfo(sparkSession: SparkSession,
                             session2GroupActionRDD: RDD[(String, Iterable[UserVisitAction])]) = {
    // userId2PartAggrInfoRDD: RDD[(userId, partAggrInfo)]
    val userId2PartAggrInfoRDD = session2GroupActionRDD.map {
      // 使用模式匹配:当结果是 KV 对的时候尽量使用 case 模式匹配,这样更清楚,更简洁直观
      case (sessionId, iterableAction) =>
        var userId = -1L

        var startTime: Date = null
        var endTime: Date = null

        var stepLength = 0 // 有多少个 action

        val searchKeywords = new StringBuffer("") // 搜索行为
        val clickCategories = new StringBuffer("") // 点击行为

        for (action <- iterableAction) {
          if (userId == -1) {
            userId = action.user_id
          }

          val actionTime = DateUtils.parseTime(action.action_time) // action_time = "2019-05-30 18:17:11" 是字符串类型
          if (startTime == null || startTime.after(actionTime)) { // startTime 在 actionTime 的后面   正常区间:[startTime, actionTime, endTime]
            startTime = actionTime
          }

          if (endTime == null || endTime.before(actionTime)) { // endTime 在 actionTime 的前面
            endTime = actionTime
          }

          val searchKeyword = action.search_keyword
          if (StringUtils.isNotEmpty(searchKeyword) && !searchKeywords.toString.contains(searchKeyword)) {
            searchKeywords.append(searchKeyword + ",")
          }

          val clickCategoryId = action.click_category_id
          if (clickCategoryId != -1 && !clickCategories.toString.contains(clickCategoryId)) {
            clickCategories.append(clickCategoryId + ",")
          }

          stepLength += 1
        }

        // searchKeywords.toString.substring(0, searchKeywords.toString.length - 1) // 等价于下面
        val searchKw = StringUtils.trimComma(searchKeywords.toString) // 去除最后一个逗号
        val clickCg = StringUtils.trimComma(clickCategories.toString) // 去除最后一个逗号

        val visitLength = (endTime.getTime - startTime.getTime) / 1000

        // 拼装聚合数据的字符串:
        // (31,sessionid=7291cc307f96432f8da9d926fd7d88e5|searchKeywords=洗面奶,小龙虾,机器学习,苹果,华为手机|clickCategoryIds=11,93,36,66,
        // 60|visitLength=3461|stepLength=43|startTime=2019-05-30 14:01:01)
        val partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionId + "|" +
          Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKw + "|" +
          Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCg + "|" +
          Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" +
          Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +
          Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime) // 格式化时间为字符串类型

        (userId, partAggrInfo)
    }

    // user_visit_action 表联立 user_info 表,让我们的统计数据中具有用户属性
    val sql = "select * from user_info"

    import sparkSession.implicits._
    // userId2InfoRDD: RDD[(userId, UserInfo)]
    val userId2InfoRDD = sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id, item))

    val sessionId2FullAggrInfoRDD = userId2PartAggrInfoRDD.join(userId2InfoRDD).map {
      case (userId, (partAggrInfo, userInfo)) =>
        val age = userInfo.age
        val professional = userInfo.professional
        val sex = userInfo.sex
        val city = userInfo.city

        // 拼装最终的聚合数据字符串:
        val fullAggrInfo = partAggrInfo + "|" +
          Constants.FIELD_AGE + "=" + age + "|" +
          Constants.FIELD_PROFESSIONAL + "=" + professional + "|" +
          Constants.FIELD_SEX + "=" + sex + "|" +
          Constants.FIELD_CITY + "=" + city

        val seesionId = StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.FIELD_SESSION_ID)

        (seesionId, fullAggrInfo)
    }

    sessionId2FullAggrInfoRDD
  }

  def getOriActionRDD(sparkSession: SparkSession, taskParam: JSONObject) = {
    // 先获取所用到的过滤条件:开始日期 和 结束日期
    val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
    val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)
    // 把所有的时间范围在 startDate 和 endDate 之间的数据查询出来
    val sql = "select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'"

    // 在对 DataFrame 和 Dataset 进行许多操作都需要这个包进行支持
    import sparkSession.implicits._
    sparkSession.sql(sql).as[UserVisitAction].rdd // DataFrame(Row类型) -> DataSet(样例类类型) -> rdd(样例类)
  }
}

自定义累加器 SessionStatisticAccumulator 代码如下:

代码语言:javascript
复制
package com.atguigu.session

import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

/**
  * 自定义累加器
  */
class SessionStatisticAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]]() {

  // 自定义累加器:要求要在类的里面维护一个 mutable.HashMap 结构
  val countMap = new mutable.HashMap[String, Int]()

  // 判断累加器是否为空
  override def isZero: Boolean = {
    this.countMap.isEmpty
  }

  // 复制一个一模一样的累加器
  override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {
    val acc = new SessionStatisticAccumulator
    acc.countMap ++= this.countMap // 将两个 Map 拼接在一起
    acc
  }

  // 重置累加器
  override def reset(): Unit = {
    this.countMap.clear()
  }

  // 向累加器中添加 KV 对(K 存在,V 累加1,K 不存在,重新创建)
  override def add(k: String): Unit = {
    if (!this.countMap.contains(k)) {
      this.countMap += (k -> 0)
    }

    this.countMap.update(k, this.countMap(k) + 1)
  }

  // 两个累加器进行合并(先判断两个累加器是否是同一类型的,再将两个 Map 进行合并(是个小难点))
  override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
    other match {
      // (1 : 100).foldLeft(0) 等价于 (0 : (1 to 100))(_+_)  又等价于 { case (int1, int2) => int1 + int2 }
      // acc.countMap.foldLeft(this.countMap) 等价于 this.countMap : acc.countMap  又等价于 this.countMap 和 acc.countMap 的每一个 KV 做操作
      case acc: SessionStatisticAccumulator => acc.countMap.foldLeft(this.countMap) {
        case (map, (k, v)) => map += (k -> (map.getOrElse(k, 0) + v))
      }
    }
  }

  override def value: mutable.HashMap[String, Int] = {
    this.countMap
  }
}

数据模型代码如下:

代码语言:javascript
复制
package com.atguigu.session

//***************** 输出表 *********************

/**
  * Session 聚合统计表
  *
  * @param taskid                     当前计算批次的 ID
  * @param session_count              所有 Session 的总和
  * @param visit_length_1s_3s_ratio   1-3s Session 访问时长占比
  * @param visit_length_4s_6s_ratio   4-6s Session 访问时长占比
  * @param visit_length_7s_9s_ratio   7-9s Session 访问时长占比
  * @param visit_length_10s_30s_ratio 10-30s Session 访问时长占比
  * @param visit_length_30s_60s_ratio 30-60s Session 访问时长占比
  * @param visit_length_1m_3m_ratio   1-3m Session 访问时长占比
  * @param visit_length_3m_10m_ratio  3-10m Session 访问时长占比
  * @param visit_length_10m_30m_ratio 10-30m Session 访问时长占比
  * @param visit_length_30m_ratio     30m Session 访问时长占比
  * @param step_length_1_3_ratio      1-3 步长占比
  * @param step_length_4_6_ratio      4-6 步长占比
  * @param step_length_7_9_ratio      7-9 步长占比
  * @param step_length_10_30_ratio    10-30 步长占比
  * @param step_length_30_60_ratio    30-60 步长占比
  * @param step_length_60_ratio       大于 60 步长占比
  */
case class SessionAggrStat(taskid: String,
                           session_count: Long,
                           visit_length_1s_3s_ratio: Double,
                           visit_length_4s_6s_ratio: Double,
                           visit_length_7s_9s_ratio: Double,
                           visit_length_10s_30s_ratio: Double,
                           visit_length_30s_60s_ratio: Double,
                           visit_length_1m_3m_ratio: Double,
                           visit_length_3m_10m_ratio: Double,
                           visit_length_10m_30m_ratio: Double,
                           visit_length_30m_ratio: Double,
                           step_length_1_3_ratio: Double,
                           step_length_4_6_ratio: Double,
                           step_length_7_9_ratio: Double,
                           step_length_10_30_ratio: Double,
                           step_length_30_60_ratio: Double,
                           step_length_60_ratio: Double)

/**
  * Session 随机抽取表
  *
  * @param taskid           当前计算批次的 ID
  * @param sessionid        抽取的 Session 的 ID
  * @param startTime        Session 的开始时间
  * @param searchKeywords   Session 的查询字段
  * @param clickCategoryIds Session 点击的类别 id 集合
  */
case class SessionRandomExtract(taskid: String,
                                sessionid: String,
                                startTime: String,
                                searchKeywords: String,
                                clickCategoryIds: String)

/**
  * Session 随机抽取详细表
  *
  * @param taskid           当前计算批次的 ID
  * @param userid           用户的 ID
  * @param sessionid        Session的 ID
  * @param pageid           某个页面的 ID
  * @param actionTime       点击行为的时间点
  * @param searchKeyword    用户搜索的关键词
  * @param clickCategoryId  某一个商品品类的 ID
  * @param clickProductId   某一个商品的 ID
  * @param orderCategoryIds 一次订单中所有品类的 ID 集合
  * @param orderProductIds  一次订单中所有商品的 ID 集合
  * @param payCategoryIds   一次支付中所有品类的 ID 集合
  * @param payProductIds    一次支付中所有商品的 ID 集合
  **/
case class SessionDetail(taskid: String,
                         userid: Long,
                         sessionid: String,
                         pageid: Long,
                         actionTime: String,
                         searchKeyword: String,
                         clickCategoryId: Long,
                         clickProductId: Long,
                         orderCategoryIds: String,
                         orderProductIds: String,
                         payCategoryIds: String,
                         payProductIds: String)

/**
  * 品类 Top10 表
  *
  * @param taskid
  * @param categoryid
  * @param clickCount
  * @param orderCount
  * @param payCount
  */
case class Top10Category(taskid: String,
                         categoryid: Long,
                         clickCount: Long,
                         orderCount: Long,
                         payCount: Long)

/**
  * Top10 Session
  *
  * @param taskid
  * @param categoryid
  * @param sessionid
  * @param clickCount
  */
case class Top10Session(taskid: String,
                        categoryid: Long,
                        sessionid: String,
                        clickCount: Long)
5.1.8 需求一实现思路整理

5.2 需求二:Session 随机抽取

5.2.1 需求解析

  在符合条件的 session 中,按照时间比例随机抽取 1000 个 session。   这个按照时间比例是什么意思呢?随机抽取本身是很简单的,但是按照时间比例,就很复杂了。比如说,这一天总共有 1000 万的 session。那么我现在总共要从这 1000 万 session 中,随机抽取出来 1000 个 session。但是这个随机不是那么简单的。需要做到如下几点要求:首先,如果这一天的 12:00~13:00 的 session 数量是 100万,那么这个小时的 session 占比就是 1/10,那么这个小时中的 100 万的 session,我们就要抽取 1/10 * 1000 = 100 个。即从这个小时的 100 万 session 中,随机抽取出 100 个 session。以此类推,其他小时的抽取也是这样做。   这个功能的作用是说,可以让使用者,能够对于符合条件的 session,按照时间比例均匀的随机采样出 1000 个 session,然后观察每个 session 具体的点击流/行为,   比如先进入了首页、然后点击了食品品类、然后点击了雨润火腿肠商品、然后搜索了火腿肠罐头的关键词、接着对王中王火腿肠下了订单、最后对订单做了支付。   之所以要做到按时间比例随机采用抽取,就是要做到,观察样本的公平性。   抽取完毕之后,需要将 Session 的相关信息和详细信息保存到 MySQL 数据库中。

5.2.2 数据源解析

  本需求的数据源来自于需求一中获取的 Session 聚合数据(fullAggrInfo)。

5.2.3 数据结构解析

SessionRandomExtract 样例类

代码语言:javascript
复制
/**
  * Session 随机抽取表
  *
  * @param taskid           当前计算批次的 ID
  * @param sessionid        抽取的 Session 的 ID
  * @param startTime        Session 的开始时间
  * @param searchKeywords   Session 的查询字段
  * @param clickCategoryIds Session 点击的类别 id 集合
  */
case class SessionRandomExtract(taskid: String,
                                sessionid: String,
                                startTime: String,
                                searchKeywords: String,
                                clickCategoryIds: String)
5.2.4 需求实现简要流程
5.2.5 需求实现详细流程
5.2.6 MySQL 存储结构解析

MySQL 写入数据格式 session_detail

代码语言:javascript
复制
-- ----------------------------
--  Table structure for `session_detail`
-- ----------------------------
DROP TABLE IF EXISTS `session_detail`; 
CREATE TABLE `session_detail` (
  `taskid` varchar(255) DEFAULT NULL,
  `userid` int(11) DEFAULT NULL,
  `sessionid` varchar(255) DEFAULT NULL,
  `pageid` int(11) DEFAULT NULL,
  `actionTime` varchar(255) DEFAULT NULL,
  `searchKeyword` varchar(255) DEFAULT NULL,
  `clickCategoryId` int(11) DEFAULT NULL,
  `clickProductId` int(11) DEFAULT NULL,
  `orderCategoryIds` varchar(255) DEFAULT NULL,
  `orderProductIds` varchar(255) DEFAULT NULL,
  `payCategoryIds` varchar(255) DEFAULT NULL,
  `payProductIds` varchar(255) DEFAULT NULL, KEY `idx_task_id` (`taskid`),
  KEY `idx_session_id` (`sessionid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

session_random_extract

代码语言:javascript
复制
-- ----------------------------
--  Table structure for `session_random_extract`
-- ----------------------------
DROP TABLE IF EXISTS `session_random_extract`; 
CREATE TABLE `session_random_extract` (
  `taskid` varchar(255) DEFAULT NULL,
  `sessionid` varchar(255) DEFAULT NULL,
  `startTime` varchar(50) DEFAULT NULL,
  `searchKeywords` varchar(255) DEFAULT NULL,
  `clickCategoryIds` varchar(255) DEFAULT NULL, 
  KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.2.7 代码解析
代码语言:javascript
复制
package com.atguigu.session

import java.util.{Date, Random, UUID}

import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.{UserInfo, UserVisitAction}
import commons.utils._
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

object SessionStat {

  def main(args: Array[String]): Unit = {
    // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
    val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
    // 获取过滤条件对应的 JsonObject 对象
    val taskParam = JSONObject.fromObject(jsonStr)

    // 创建全局唯一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不同任务,作为写入 MySQL 数据库中那张表的主键
    val taskUUID = UUID.randomUUID().toString

    // 创建 sparkConf
    val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")

    // 创建 sparkSession(包含 sparkContext)
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************

    // 获取原始的动作表数据(带有过滤条件)
    // actionRDD: RDD[UserVisitAction]
    val actionRDD = getOriActionRDD(sparkSession, taskParam)

    // 将用户行为信息转换为 K-V 结构,sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
    val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))

    // session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
    val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一个 sessionId 的数据聚合到一起,得到斧子形数据

    // 将数据进行内存缓存
    session2GroupActionRDD.cache()

    // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
    val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)

    // 创建自定义累加器对象
    val sessionStatisticAccumulator = new SessionStatisticAccumulator

    // 在 sparkSession 中注册自定义累加器,这样后面就可以用了
    sparkSession.sparkContext.register(sessionStatisticAccumulator)

    // 根据过滤条件对 sessionId2FullAggrInfoRDD 进行过滤操作,即过滤掉不符合条件的数据,并根据自定义累加器 统计不同范围的 访问时长 和 访问步长 的 session 个数 以及 总的 session 个数
    val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)

    // 必须引入任意一个 action 的算子,才能启动
    seeionId2FilterRDD.foreach(println(_))

    // 计算各个 session 的占比
    getSessionRatio(sparkSession, taskUUID, sessionStatisticAccumulator.value)

    // ******************** 需求二:Session 随机抽取 ********************

    // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)],注意:到这里一个 sessionId 对应一条数据,也就是一个 fullAggrInfo
    sessionRandomExtract(sparkSession, taskUUID, seeionId2FilterRDD)

  }

  // ******************** 需求二:Session 随机抽取 ********************
  /**
    * Session 随机抽取
    *
    * @param sparkSession
    * @param taskUUID
    * @param seeionId2FilterRDD
    */
  def sessionRandomExtract(sparkSession: SparkSession, taskUUID: String, seeionId2FilterRDD: RDD[(String, String)]): Unit = {
    // 由于是按照 时间 为 key 进行聚合,所以先将 seeionId2FilterRDD 的 key 转化为 时间

    // dateHour2FullAggrInfoRDD: RDD[(dateHour, fullAggrInfo)]
    val dateHour2FullAggrInfoRDD = seeionId2FilterRDD.map {
      case (sessionId, fullAggrInfo) =>
        // 先从 fullAggrInfo 中提取出来 startTime
        val startTime = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_START_TIME)
        // 得到的 startTime = "2019-05-30 18:17:11" 是字符串类型,需要转换成我们需要的格式:yyyy-MM-dd_HH
        val dateHour = DateUtils.getDateHour(startTime)

        (dateHour, fullAggrInfo)
    }

    // hourCountMap: Map[(dateHour, count)],示例:(yyyy-MM-dd_HH, 20)
    val hourCountMap = dateHour2FullAggrInfoRDD.countByKey()

    // dateHourCountMap: Map[data, Map[(hour, count)]],示例:(yyyy-MM-dd, (HH, 20))
    val dateHourCountMap = new mutable.HashMap[String, mutable.HashMap[String, Long]]()

    for ((dateHour, count) <- hourCountMap) {
      val date = dateHour.split("_")(0) // yyyy-MM-dd_HH
      val hour = dateHour.split("_")(1) // HH

      dateHourCountMap.get(date) match { // Map[(hour, count)
        case None =>
          dateHourCountMap(date) = new mutable.HashMap[String, Long]() // 先创建 1 个空的 HashMap
          dateHourCountMap(date) += (hour -> count) // 再给 HashMap 赋值
        case Some(map) =>
          dateHourCountMap(date) += (hour -> count) // 直接给 HashMap 赋值
      }
    }

    // 解决问题一:
    //   一共有多少天:dateHourCountMap.size
    //   一天抽取多少条:1000 / dateHourCountMap.size
    val extractPerDay = 1000 / dateHourCountMap.size

    // 解决问题二:
    //   一共有多少个:session:dateHourCountMap(date).values.sum
    //   一个小时有多少个:session:dateHourCountMap(date)(hour)

    val dateHourExtractIndexListMap = new mutable.HashMap[String, mutable.HashMap[String, ListBuffer[Int]]]()

    // dateHourCountMap: Map[data, Map[(hour, count)]],示例:(yyyy-MM-dd, (HH, 20))
    // hourCountMap: Map[(hour, count)],示例:(HH, 20) ,注意:这里面的 hourCountMap 含义发生变化了,要跟上面的最开始的 hourCountMap 区别开来
    for ((date, hourCountMap) <- dateHourCountMap) {
      // 一天共有多少个 session
      val dataCount = hourCountMap.values.sum

      dateHourExtractIndexListMap.get(date) match {
        case None =>
          dateHourExtractIndexListMap(date) = new mutable.HashMap[String, mutable.ListBuffer[Int]]()
          generateRandomIndexList(extractPerDay, dataCount, hourCountMap, dateHourExtractIndexListMap(date))
        case Some(map) =>
          generateRandomIndexList(extractPerDay, dataCount, hourCountMap, dateHourExtractIndexListMap(date))
      }
    }

    // 到此为止,我们获得了每个小时要抽取的 session 的 index
    // 之后在算子中使用 dateHourExtractIndexListMap 这个 Map,由于这个 Map 可能会很大,所以涉及到 广播大变量 的问题

    // 广播大变量,提升任务 task 的性能
    val dateHourExtractIndexListMapBroadcastVar = sparkSession.sparkContext.broadcast(dateHourExtractIndexListMap)

    // dateHour2FullAggrInfoRDD: RDD[(dateHour, fullAggrInfo)]
    // dateHour2GroupRDD: RDD[(dateHour, Iterable[fullAggrInfo])]
    val dateHour2GroupRDD = dateHour2FullAggrInfoRDD.groupByKey()

    // extractSessionRDD: RDD[SessionRandomExtract]
    val extractSessionRDD = dateHour2GroupRDD.flatMap {
      case (dateHour, iterableFullAggrInfo) =>
        val date = dateHour.split("_")(0)
        val hour = dateHour.split("_")(1)

        val extractIndexList = dateHourExtractIndexListMapBroadcastVar.value.get(date).get(hour)

        // 创建一个容器存储抽取的 session
        val extractSessionArrayBuffer = new ArrayBuffer[SessionRandomExtract]()
        var index = 0

        for (fullAggrInfo <- iterableFullAggrInfo) {
          if (extractIndexList.contains(index)) {
            // 提取数据,封装成所需要的样例类,并追加进 ArrayBuffer 中
            val sessionId = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_SESSION_ID)
            val startTime = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_START_TIME)
            val searchKeywords = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)
            val clickCategoryIds = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)

            val sessionRandomExtract = SessionRandomExtract(taskUUID, sessionId, startTime, searchKeywords, clickCategoryIds)

            extractSessionArrayBuffer += sessionRandomExtract
          }
          index += 1
        }

        extractSessionArrayBuffer
    }

    // 将抽取后的数据保存到 MySQL
    import sparkSession.implicits._
    extractSessionRDD.toDF().write
      .format("jdbc")
      .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("dbtable", "session_random_extract")
      .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .mode(SaveMode.Append)
      .save()
  }

  /**
    * 根据每个小时应该抽取的数量,来产生随机值
    *
    * @param extractPerDay 一天抽取的 seesion 个数
    * @param dataCount     当天所有的 seesion 总数
    * @param hourCountMap  每个小时的session总数
    * @param hourListMap   主要用来存放生成的随机值
    */
  def generateRandomIndexList(extractPerDay: Long,
                              dataCount: Long,
                              hourCountMap: mutable.HashMap[String, Long],
                              hourListMap: mutable.HashMap[String, mutable.ListBuffer[Int]]): Unit = {
    // 先遍历 hourCountMap,hourCountMap: Map[(hour, count)],示例:(HH, 20) ,注意:这里面的 hourCountMap 含义发生变化了,要跟上面的最开始的 hourCountMap 区别开来
    for ((hour, count) <- hourCountMap) {
      // 计算一个小时抽取多少个 session
      var hourExtractCount = ((count / dataCount.toDouble) * extractPerDay).toInt
      // 避免一个小时要抽取的数量超过这个小时的总数
      if (hourExtractCount > count) {
        hourExtractCount = count.toInt
      }

      val random = new Random()

      hourListMap.get(hour) match {
        case None =>
          hourListMap(hour) = new mutable.ListBuffer[Int] // 没有 List,需要新建一个 List
          for (i <- 0 until hourExtractCount) {
            var index = random.nextInt(count.toInt) // 生成 index
            while (hourListMap(hour).contains(index)) { // 如果 index 已存在
              index = random.nextInt(count.toInt) // 则重新生成 index
            }

            // 将生成的 index 放入到 hourListMap 中
            hourListMap(hour).append(index)
          }

        case Some(list) =>
          for (i <- 0 until hourExtractCount) {
            var index = random.nextInt(count.toInt) // 生成 index
            while (hourListMap(hour).contains(index)) { // 如果 index 已存在
              index = random.nextInt(count.toInt) // 则重新生成 index
            }

            // 将生成的 index 放入到 hourListMap 中
            hourListMap(hour).append(index)
          }
      }
    }


  }

  // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************
  // ......
}
5.2.8 需求二实现思路整理

5.3 需求三:Top10 热门品类统计

5.3.1 需求解析

  在符合条件的 session 中,获取点击、下单和支付数量排名前 10 的品类。   数据中的每个 session 可能都会对一些品类的商品进行点击、下单和支付等等行为,那么现在就需要获取这些 session 点击、下单和支付数量排名前 10 的最热门的品类。也就是说,要计算出所有这些 session 对各个品类的点击、下单和支付的次数, 然后按照这三个属性进行排序,获取前 10 个品类。   这个功能很重要,可以让我们明白,符合条件的用户,他最感兴趣的商品是什么种类。这个可以让公司里的人,清晰地了解到不同层次、不同类型的用户的心理和喜好。   计算完成之后,将数据保存到 MySQL 数据库中。

5.3.2 数据源解析
代码语言:javascript
复制
sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]

seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
5.3.3 数据结构解析
代码语言:javascript
复制
/**
  * 品类 Top10 表
  *
  * @param taskid
  * @param categoryid
  * @param clickCount
  * @param orderCount
  * @param payCount
  */
case class Top10Category(taskid: String,
                         categoryid: Long,
                         clickCount: Long,
                         orderCount: Long,
                         payCount: Long)
5.3.4 需求实现简要流程

5.3.5 需求实现详细流程
5.3.6 MySQL 存储结构解析
代码语言:javascript
复制
-- ----------------------------
--  Table structure for `top10_category`
-- ----------------------------
DROP TABLE IF EXISTS `top10_category`;
CREATE TABLE `top10_category` (
  `taskid` varchar(255) DEFAULT NULL,
  `categoryid` int(11) DEFAULT NULL,
  `clickCount` int(11) DEFAULT NULL,
  `orderCount` int(11) DEFAULT NULL,
  `payCount` int(11) DEFAULT NULL,
  KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.3.7 代码解析
代码语言:javascript
复制
package com.atguigu.session

import java.util.{Date, Random, UUID}

import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.{UserInfo, UserVisitAction}
import commons.utils._
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

object SessionStat {

  def main(args: Array[String]): Unit = {
    // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
    val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
    // 获取过滤条件对应的 JsonObject 对象
    val taskParam = JSONObject.fromObject(jsonStr)

    // 创建全局唯一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不同任务,作为写入 MySQL 数据库中那张表的主键
    val taskUUID = UUID.randomUUID().toString

    // 创建 sparkConf
    val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")

    // 创建 sparkSession(包含 sparkContext)
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************

    // 获取原始的动作表数据(带有过滤条件)
    // actionRDD: RDD[UserVisitAction]
    val actionRDD = getOriActionRDD(sparkSession, taskParam)

    // 将用户行为信息转换为 K-V 结构
    // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
    val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))

    // session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
    val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一个 sessionId 的数据聚合到一起,得到斧子形数据

    // 将数据进行内存缓存
    session2GroupActionRDD.cache()

    // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
    val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)

    // 创建自定义累加器对象
    val sessionStatisticAccumulator = new SessionStatisticAccumulator

    // 在 sparkSession 中注册自定义累加器,这样后面就可以用了
    sparkSession.sparkContext.register(sessionStatisticAccumulator)

    // 根据过滤条件对 sessionId2FullAggrInfoRDD 进行过滤操作,即过滤掉不符合条件的数据,并根据自定义累加器 统计不同范围的 访问时长 和 访问步长 的 session 个数 以及 总的 session 个数
    // seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
    val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)

    // 必须引入任意一个 action 的算子,才能启动
    seeionId2FilterRDD.foreach(println(_))

    // 计算各个 session 的占比
    getSessionRatio(sparkSession, taskUUID, sessionStatisticAccumulator.value)

    // ******************** 需求二:Session 随机抽取 ********************

    // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)],注意:到这里一个 sessionId 对应一条数据,也就是一个 fullAggrInfo
    sessionRandomExtract(sparkSession, taskUUID, seeionId2FilterRDD)

    // ******************** 需求三:Top10 热门品类统计 ********************

    // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
    // seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]

    // join 默认是内连接,即不符合条件的不显示(即被过滤掉)

    // 获取所有符合过滤条件的原始的 UserVisitAction 数据
    val seeionId2ActionFilterRDD = sessionId2ActionRDD.join(seeionId2FilterRDD).map {
      case (sessionId, (userVisitAction, fullAggrInfo)) =>
        (sessionId, userVisitAction)
    }

    val top10CategoryArray = top10PopularCategories(sparkSession, taskUUID, seeionId2ActionFilterRDD)
  }

  /**
    * Top10 热门品类统计
    *
    * @param sparkSession
    * @param taskUUID
    * @param seeionId2ActionFilterRDD 所有符合过滤条件的原始的 UserVisitAction 数据
    */
  def top10PopularCategories(sparkSession: SparkSession, taskUUID: String, seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
    // 第一步:获取所有发生过点击、下单、付款的 categoryId,注意:其中被点击的 categoryId 只有一个,被下单和被付款的 categoryId 有多个,categoryId 之间使用逗号隔开的
    var cid2CidRDD = seeionId2ActionFilterRDD.flatMap {
      case (sessionId, userVisitAction) =>
        val categoryIdBuffer = new ArrayBuffer[(Long, Long)]()

        // 提取出数据填充 ArrayBuffer
        if (userVisitAction.click_category_id != -1) { // 点击行为
          categoryIdBuffer += ((userVisitAction.click_category_id, userVisitAction.click_category_id)) // 只有第一个 key 有用,第二个 value 任何值都可以,但是不可以没有
        } else if (userVisitAction.order_category_ids != null) { // 下单行为
          for (order_category_id <- userVisitAction.order_category_ids.split(",")) {
            categoryIdBuffer += ((order_category_id.toLong, order_category_id.toLong))
          }
        } else if (userVisitAction.pay_category_ids != null) { // 付款行为
          for (pay_category_id <- userVisitAction.pay_category_ids.split(",")) {
            categoryIdBuffer += ((pay_category_id.toLong, pay_category_id.toLong))
          }
        }

        categoryIdBuffer
    }

    // 第二步:进行去重操作
    cid2CidRDD = cid2CidRDD.distinct()

    // 第三步:统计各品类 被点击的次数、被下单的次数、被付款的次数
    val cid2ClickCountRDD = getClickCount(seeionId2ActionFilterRDD)
    val cid2OrderCountRDD = getOrderCount(seeionId2ActionFilterRDD)
    val cid2PayCountRDD = getPayCount(seeionId2ActionFilterRDD)

    // 第四步:获取各个 categoryId 的点击次数、下单次数、付款次数,并进行拼装
    // cid2FullCountRDD: RDD[(cid, aggrCountInfo)]
    // (81,categoryId=81|clickCount=68|orderCount=64|payCount=72)
    val cid2FullCountRDD = getFullCount(cid2CidRDD, cid2ClickCountRDD, cid2OrderCountRDD, cid2PayCountRDD)

    // 第五步:根据点击次数、下单次数、付款次数依次排序,会用到 【二次排序】,实现自定义的二次排序的 key

    // 第六步:封装 SortKey
    val sortKey2FullCountRDD = cid2FullCountRDD.map {
      case (cid, fullCountInfo) =>
        val clickCount = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CLICK_COUNT).toLong
        val orderCount = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_ORDER_COUNT).toLong
        val payCount = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_PAY_COUNT).toLong

        val sortKey = SortKey(clickCount, orderCount, payCount)
        (sortKey, fullCountInfo)
    }

    // 第七步:降序排序,取出 top10 热门品类
    val top10CategoryArray = sortKey2FullCountRDD.sortByKey(false).take(10)

    // 第八步:将 Array 结构转化为 RDD,封装 Top10Category
    val top10CategoryRDD = sparkSession.sparkContext.makeRDD(top10CategoryArray).map {
      case (sortKey, fullCountInfo) =>
        val categoryid = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
        val clickCount = sortKey.clickCount
        val orderCount = sortKey.orderCount
        val payCount = sortKey.payCount

        Top10Category(taskUUID, categoryid, clickCount, orderCount, payCount)
    }

    // 第九步:写入 MySQL 数据库
    import sparkSession.implicits._
    top10CategoryRDD.toDF().write
      .format("jdbc")
      .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("dbtable", "top10_category")
      .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .mode(SaveMode.Append)
      .save()

    top10CategoryArray
  }

  /**
    *
    * @param cid2CidRDD
    * @param cid2ClickCountRDD
    * @param cid2OrderCountRDD
    * @param cid2PayCountRDD
    * @return
    */
  def getFullCount(cid2CidRDD: RDD[(Long, Long)],
                   cid2ClickCountRDD: RDD[(Long, Long)],
                   cid2OrderCountRDD: RDD[(Long, Long)],
                   cid2PayCountRDD: RDD[(Long, Long)]) = {
    // 左外连接:不符合添加显示为空(null)

    // 4.1 所有品类id 和 被点击的品类 做左外连接
    val cid2ClickInfoRDD = cid2CidRDD.leftOuterJoin(cid2ClickCountRDD).map {
      case (cid, (categoryId, option)) =>
        val clickCount = if (option.isDefined) option.get else 0
        val aggrCountInfo = Constants.FIELD_CATEGORY_ID + "=" + cid + "|" + Constants.FIELD_CLICK_COUNT + "=" + clickCount

        (cid, aggrCountInfo)
    }
    // 4.2 4.1 的结果 和 被下单的品类 做左外连接
    val cid2OrderInfoRDD = cid2ClickInfoRDD.leftOuterJoin(cid2OrderCountRDD).map {
      case (cid, (clickInfo, option)) =>
        val orderCount = if (option.isDefined) option.get else 0
        val aggrCountInfo = clickInfo + "|" + Constants.FIELD_ORDER_COUNT + "=" + orderCount

        (cid, aggrCountInfo)
    }
    // 4.3 4.2 的结果 和 被付款的品类 做左外连接
    val cid2PayInfoRDD = cid2OrderInfoRDD.leftOuterJoin(cid2PayCountRDD).map {
      case (cid, (orderInfo, option)) =>
        val payCount = if (option.isDefined) option.get else 0
        val aggrCountInfo = orderInfo + "|" + Constants.FIELD_PAY_COUNT + "=" + payCount

        (cid, aggrCountInfo)
    }

    cid2PayInfoRDD
  }

  /**
    * 统计各品类被点击的次数
    *
    * @param seeionId2ActionFilterRDD
    */
  def getClickCount(seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
    // 方式一:把发生过点击的 action 过滤出来
    val clickActionFilterRDD = seeionId2ActionFilterRDD.filter {
      case (sessionId, userVisitAction) =>
        userVisitAction.click_category_id != 1L
    }
    // 方式二:把发生点击的 action 过滤出来,二者等价
    // val clickActionFilterRDD2 = seeionId2ActionFilterRDD.filter(item => item._2.click_category_id != -1L)

    // 获取每种类别的点击次数
    val clickNumRDD = clickActionFilterRDD.map {
      case (sessionId, userVisitAction) =>
        (userVisitAction.click_category_id, 1L)
    }
    // 计算各个品类的点击次数
    clickNumRDD.reduceByKey(_ + _)
  }

  /**
    * 统计各品类被下单的次数
    *
    * @param seeionId2ActionFilterRDD
    */
  def getOrderCount(seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
    // 把发生过下单的 action 过滤出来
    val orderActionFilterRDD = seeionId2ActionFilterRDD.filter {
      case (sessionId, userVisitAction) =>
        userVisitAction.order_category_ids != null
    }
    // 获取每种类别的下单次数
    val orderNumRDD = orderActionFilterRDD.flatMap {
      case (sessionId, userVisitAction) =>
        userVisitAction.order_category_ids.split(",").map(item => (item.toLong, 1L))
    }
    // 计算各个品类的下单次数
    orderNumRDD.reduceByKey(_ + _)
  }

  /**
    * 统计各品类被付款的次数
    *
    * @param seeionId2ActionFilterRDD
    */
  def getPayCount(seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
    // 把发生过付款的 action 过滤出来
    val payActionFilterRDD = seeionId2ActionFilterRDD.filter {
      case (sessionId, userVisitAction) =>
        userVisitAction.pay_category_ids != null
    }
    // 获取每种类别的支付次数
    val payNumRDD = payActionFilterRDD.flatMap {
      case (sessionId, userVisitAction) =>
        userVisitAction.pay_category_ids.split(",").map(item => (item.toLong, 1L))
    }
    // 计算各个品类的支付次数
    payNumRDD.reduceByKey(_ + _)
  }

  // ******************** 需求二:Session 随机抽取 ********************

  // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************

}
5.3.8 需求三实现思路整理

5.4 需求四:Top10 热门品类的 Top10 活跃 Session 统计

5.4.1 需求解析

  对于排名前 10 的品类,分别获取其点击次数排名前 10 的 session。   这个就是说,对于 top10 的品类,每一个都要获取对它点击次数排名前 10 的 session。   这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的 session 的行为。   计算完成之后,将数据保存到 MySQL 数据库中。

5.4.2 数据源解析
代码语言:javascript
复制
seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
top10CategoryArray: Array[(sortKey, fullCountInfo)]
5.4.3 数据结构解析
代码语言:javascript
复制
/**
  * Top10 Session
  *
  * @param taskid
  * @param categoryid
  * @param sessionid
  * @param clickCount
  */
case class Top10Session(taskid: String,
                        categoryid: Long,
                        sessionid: String,
                        clickCount: Long)

/**
  * Session 随机抽取详细表
  *
  * @param taskid           当前计算批次的 ID
  * @param userid           用户的 ID
  * @param sessionid        Session 的 ID
  * @param pageid           某个页面的 ID
  * @param actionTime       点击行为的时间点
  * @param searchKeyword    用户搜索的关键词
  * @param clickCategoryId  某一个商品品类的 ID
  * @param clickProductId   某一个商品的 ID
  * @param orderCategoryIds 一次订单中所有品类的 ID 集合
  * @param orderProductIds  一次订单中所有商品的 ID 集合
  * @param payCategoryIds   一次支付中所有品类的 ID 集合
  * @param payProductIds    一次支付中所有商品的 ID 集合
  **/
case class SessionDetail(taskid: String,
                         userid: Long,
                         sessionid: String,
                         pageid: Long,
                         actionTime: String,
                         searchKeyword: String,
                         clickCategoryId: Long,
                         clickProductId: Long,
                         orderCategoryIds: String,
                         orderProductIds: String,
                         payCategoryIds: String,
                         payProductIds: String)
5.4.4 需求实现简要流程
5.4.5 需求实现详细流程

5.4.6 MySQL 存储结构解析
代码语言:javascript
复制
-- ----------------------------
--  Table structure for `top10_session`
-- ----------------------------
DROP TABLE IF EXISTS `top10_session`;
CREATE TABLE `top10_session` (
  `taskid` varchar(255) DEFAULT NULL,
  `categoryid` int(11) DEFAULT NULL,
  `sessionid` varchar(255) DEFAULT NULL,
  `clickCount` int(11) DEFAULT NULL, KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.4.7 代码解析
代码语言:javascript
复制
package com.atguigu.session

import java.util.{Date, Random, UUID}

import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.{UserInfo, UserVisitAction}
import commons.utils._
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

object SessionStat {

  def main(args: Array[String]): Unit = {
    // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
    val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
    // 获取过滤条件对应的 JsonObject 对象
    val taskParam = JSONObject.fromObject(jsonStr)

    // 创建全局唯一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不同任务,作为写入 MySQL 数据库中那张表的主键
    val taskUUID = UUID.randomUUID().toString

    // 创建 sparkConf
    val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")

    // 创建 sparkSession(包含 sparkContext)
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************

    // 获取原始的动作表数据(带有过滤条件)
    // actionRDD: RDD[UserVisitAction]
    val actionRDD = getOriActionRDD(sparkSession, taskParam)

    // 将用户行为信息转换为 K-V 结构
    // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
    val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))

    // session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
    val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一个 sessionId 的数据聚合到一起,得到斧子形数据

    // 将数据进行内存缓存
    session2GroupActionRDD.cache()

    // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
    val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)

    // 创建自定义累加器对象
    val sessionStatisticAccumulator = new SessionStatisticAccumulator

    // 在 sparkSession 中注册自定义累加器,这样后面就可以用了
    sparkSession.sparkContext.register(sessionStatisticAccumulator)

    // 根据过滤条件对 sessionId2FullAggrInfoRDD 进行过滤操作,即过滤掉不符合条件的数据,并根据自定义累加器 统计不同范围的 访问时长 和 访问步长 的 session 个数 以及 总的 session 个数
    // seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
    val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)

    // 必须引入任意一个 action 的算子,才能启动
    seeionId2FilterRDD.foreach(println(_))

    // 计算各个 session 的占比
    getSessionRatio(sparkSession, taskUUID, sessionStatisticAccumulator.value)

    // ******************** 需求二:Session 随机抽取 ********************

    // sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)],注意:到这里一个 sessionId 对应一条数据,也就是一个 fullAggrInfo
    sessionRandomExtract(sparkSession, taskUUID, seeionId2FilterRDD)

    // ******************** 需求三:Top10 热门品类统计 ********************

    // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
    // seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]

    // join 默认是内连接,即不符合条件的不显示(即被过滤掉)

    // 获取所有符合过滤条件的原始的 UserVisitAction 数据
    // seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
    val seeionId2ActionFilterRDD = sessionId2ActionRDD.join(seeionId2FilterRDD).map {
      case (sessionId, (userVisitAction, fullAggrInfo)) =>
        (sessionId, userVisitAction)
    }

    val top10CategoryArray = top10PopularCategories(sparkSession, taskUUID, seeionId2ActionFilterRDD)

    // ******************** 需求四:Top10 热门品类的 Top10 活跃 Session 统计 ********************

    // seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
    // top10CategoryArray: Array[(sortKey, fullCountInfo)]

    top10ActiveSession(sparkSession, taskUUID, seeionId2ActionFilterRDD, top10CategoryArray)

  }

  // ******************** 需求四:Top10 热门品类的 Top10 活跃 Session 统计 ********************

  /**
    * Top10 热门品类的 Top10 活跃 Session 统计
    *
    * @param sparkSession
    * @param taskUUID
    * @param seeionId2ActionFilterRDD
    * @param top10CategoryArray
    */
  def top10ActiveSession(sparkSession: SparkSession,
                         taskUUID: String,
                         seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)],
                         top10CategoryArray: Array[(SortKey, String)]): Unit = {
    // 第一步:获取所有点击过 Top10 热门品类的 UserVisitAction
    // 第一种方法:Join 方法,该方式需要引起 Shuffle,比较麻烦

    /*
    // 将 top10CategoryArray 转化为 RDD,然后将其 key sortKey 转化为 cid
    val cid2FullCountInfoRDD = sparkSession.sparkContext.makeRDD(top10CategoryArray).map {
      case (sortKey, fullCountInfo) =>
        // 取出 categoryId
        val cid = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
        // 返回所需的 RDD
        (cid, fullCountInfo)
    }

    // 将 seeionId2ActionFilterRDD 的 key sessionId 转化为 cid,对其使用 map 操作即可
    val cid2ActionRDD = seeionId2ActionFilterRDD.map {
      case (sessionId, userVisitAction) =>
        val cid = userVisitAction.click_category_id
        (cid, userVisitAction)
    }

    // joinn 操作(即内连接):两边都有的才留下,否则过滤掉
    cid2FullCountInfoRDD.join(cid2ActionRDD).map {
      case (cid, (fullCountInfo, userVisitAction)) =>
        val sid = userVisitAction.session_id
        (sid, userVisitAction)
    }*/

    // 第二种方法:使用 filter
    // cidArray: Array[Long] 包含了 Top10 热门品类的 id
    val cidArray = top10CategoryArray.map {
      case (sortKey, fullCountInfo) =>
        val cid = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
        cid
    }

    // 所有符合过滤条件的,并且点击过 Top10 热门品类的 UserVisitAction
    val seeionId2ActionRDD = seeionId2ActionFilterRDD.filter {
      case (sessionId, userVisitAction) =>
        cidArray.contains(userVisitAction.click_category_id)
    }

    // 第二步:先对 所有符合过滤条件的,并且点击过 Top10 热门品类的 UserVisitAction 按照 sessionId 进行聚合
    val seeionId2GroupRDD = seeionId2ActionRDD.groupByKey()


    // 第三步:统计 每一个 sessionId 对于点击过的每一个品类的点击次数
    // cid2SessionCountRDD: RDD[(cid, sessionN=sessionCount)]
    val cid2SessionCountRDD = seeionId2GroupRDD.flatMap {
      case (sessionId, iterableUserVisitAction) =>
        // 创建 Map,用于保存当前每一个 sessionId 对于点击过的每一个品类的点击次数
        val categoryCountMap = new mutable.HashMap[Long, Long]()

        for (userVisitAction <- iterableUserVisitAction) {
          val cid = userVisitAction.click_category_id
          if (!categoryCountMap.contains(cid))
            categoryCountMap += (cid -> 0)

          categoryCountMap.update(cid, categoryCountMap(cid) + 1)
        }

        // 该 Map 记录了一个 session 对于它所有点击过的品类的点击次数
        // categoryCountMap

        for ((cid, sessionCount) <- categoryCountMap)
          yield (cid, sessionId + "=" + sessionCount)
    }

    // 第四步:对 cid2SessionCountRDD 进行聚合
    // cid2GroupRDD: RDD[(cid, Iterable[sessionN=sessionCount]))]
    // cid2GroupRDD 的每一条数据都是一个 cid 和它对应的所有点击过它的 sessionId 对它的点击次数
    val cid2GroupRDD = cid2SessionCountRDD.groupByKey()

    // 第五步:取出 top10SessionRDD: RDD[Top10Session]
    val top10SessionRDD = cid2GroupRDD.flatMap {
      case (cid, iterablesSessionCount) =>
        val sortList = iterablesSessionCount.toList.sortWith((item1, item2) => { // true: item1 放在前面
          item1.split("=")(1).toLong > item2.split("=")(1).toLong // item1: sessionCount 字符串类型 sessionIdN=count
        }).take(10)

        // 封装数据,准备写入 MySQL 数据库
        val top10Session = sortList.map {
          case item => {
            val categoryid = cid
            val sessionid = item.split("=")(0)
            val clickCount = item.split("=")(1).toLong

            Top10Session(taskUUID, categoryid, sessionid, clickCount)
          }
        }

        top10Session
    }

    // 写入 MySQL 数据库
    import sparkSession.implicits._
    top10SessionRDD.toDF().write
      .format("jdbc")
      .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .option("dbtable", "top10_session")
      .mode(SaveMode.Append)
      .save()
  }

  // ******************** 需求三:Top10 热门品类统计 ********************

  // ******************** 需求二:Session 随机抽取 ********************

  // ******************** 需求一:Session 各范围访问步长、访问时长占比统计 ********************
}
5.4.8 需求三、四实现思路整理

5.5 需求五:页面单跳转化率统计

5.5.1 需求解析

  计算页面单跳转化率 什么是页面单跳转换率 比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率,比如: 计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率,我们记为 C;那么页面 5-7 的转化率怎么求呢?先需要求出符合条件的 Session 中访问页面 5 又紧接着访问了页面 7 的次数为 D,那么 D/B 即为 5-7 的单跳转化率。   产品经理,可以根据这个指标,去尝试分析整个网站、产品各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。   数据分析师,可以此数据做更深一步的计算和分析。   企业管理层, 可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略或策略。   在以下模块中,需要根据查询对象中设置的 Session 过滤条件,先将对应的 Session 过滤出来,然后根据查询对象中设置的页面路径,计算页面单跳转化率,比如查询的页面路径为:3、5、7、8,那么就要计算 3-5、5-7、7-8 的页面单跳转化率。需要注意的一点是,页面的访问是有先后的。

5.5.2 数据源解析
代码语言:javascript
复制
动作表
5.5.3 数据结构解析
代码语言:javascript
复制
/**
  * 用户访问动作表
  *
  * @param date               用户点击行为的日期
  * @param user_id            用户的 ID
  * @param session_id         Session 的 ID
  * @param page_id            某个页面的 ID
  * @param action_time        点击行为的时间点
  * @param search_keyword     用户搜索的关键词
  * @param click_category_id  某一个商品品类的 ID
  * @param click_product_id   某一个商品的 ID
  * @param order_category_ids 一次订单中所有品类的 ID 集合
  * @param order_product_ids  一次订单中所有商品的 ID 集合
  * @param pay_category_ids   一次支付中所有品类的 ID 集合
  * @param pay_product_ids    一次支付中所有商品的 ID 集合
  * @param city_id            城市 ID
  */
case class UserVisitAction(date: String,
                           user_id: Long,
                           session_id: String,
                           page_id: Long,
                           action_time: String,
                           search_keyword: String,
                           click_category_id: Long,
                           click_product_id: Long,
                           order_category_ids: String,
                           order_product_ids: String,
                           pay_category_ids: String,
                           pay_product_ids: String,
                           city_id: Long)
5.5.4 需求实现简要流程

举例

如何做

5.5.5 需求实现详细流程

5.5.6 MySQL 存储结构解析
代码语言:javascript
复制
-- ----------------------------
--  Table structure for `page_split_convert_rate`
-- ----------------------------
DROP TABLE IF EXISTS `page_split_convert_rate`;
CREATE TABLE `page_split_convert_rate` (
  `taskid` varchar(255) DEFAULT NULL,
  `convertRate` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.5.7 代码解析

在 analyse 中新建子模块 page,配置 pom.xml 文件,添加 scala 框架的支持

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>analyse</artifactId>
        <groupId>com.atguigu</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>page</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.atguigu</groupId>
            <artifactId>commons</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <!-- Spark 的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
        </dependency>
        <!-- 引入 Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <!-- scala-maven-plugin 插件用于在任意的 maven 项目中对 scala 代码进行编译/测试/运行/文档化 -->
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.atguigu.page.PageOneStepConvertRate</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

示例代码:

代码语言:javascript
复制
package com.atguigu.page

import java.util.UUID

import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.UserVisitAction
import commons.utils.{DateUtils, ParamUtils}
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable

object PageConvertStat {

  def main(args: Array[String]): Unit = {
    // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
    val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
    // 获取过滤条件对应的 JsonObject 对象
    val taskParam = JSONObject.fromObject(jsonStr)

    // 创建全局唯一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不同任务,作为写入 MySQL 数据库中那张表的主键
    val taskUUID = UUID.randomUUID().toString

    // 创建 sparkConf
    val sparkConf = new SparkConf().setAppName("pageConvert").setMaster("local[*]")

    // 创建 sparkSession(包含 sparkContext)
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    // ******************** 需求五:页面单跳转化率统计 ********************


    // 获取原始的动作表数据(带有过滤条件)
    // actionRDD: RDD[UserVisitAction]
    val actionRDD = getOriActionRDD(sparkSession, taskParam)

    // 将用户行为信息转换为 K-V 结构
    // sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
    val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))

    // 将数据进行内存缓存
    sessionId2ActionRDD.persist(StorageLevel.MEMORY_ONLY)

    // 目标页面切片:将页面流路径转换为页面切片
    // targetPageFlowStr:"1,2,3,4,5,6,7"
    val targetPageFlowStr = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW)
    // targetPageFlowArray: Array[Long][1,2,3,4,5,6,7]
    val targetPageFlowArray = targetPageFlowStr.split(",")
    // targetPageFlowArray.slice(0, targetPageFlowArray.length - 1): [1,2,3,4,5,6]
    // targetPageFlowArray.tail: [2,3,4,5,6,7]
    // targetPageFlowArray.slice(0, targetPageFlowArray.length - 1).zip(targetPageFlowArray.tail): [(1,2),(2,3),(3,4),(4,5),(5,6),(6,7)]
    val targetPageSplit = targetPageFlowArray.slice(0, targetPageFlowArray.length - 1).zip(targetPageFlowArray.tail).map {
      case (page1, page2) =>
        (page1 + "_" + page2)
    }

    // 获取实际页面切片
    // 对 <sessionId,访问行为> RDD,做一次 groupByKey 操作,生成页面切片
    val session2GroupActionRDD = sessionId2ActionRDD.groupByKey()

    // realPageSplitNumRDD: RDD[(String, 1L)]
    val realPageSplitNumRDD = session2GroupActionRDD.flatMap {
      case (sessionId, iterableUserVisitAction) =>
        // item1: UserVisitAction
        // item2: UserVisitAction
        // sortList: List[UserVisitAction] // 排好序的 UserVisitAction
        val sortList = iterableUserVisitAction.toList.sortWith((item1, item2) => {
          DateUtils.parseTime(item1.action_time).getTime < DateUtils.parseTime(item2.action_time).getTime
        })

        // 获取 page 信息
        // pageList: List[Long]
        val pageList = sortList.map {
          case userVisitAction =>
            userVisitAction.page_id
        }

        // pageList.slice(0, pageList.length - 1): List[1,2,3,...,N-1]
        // pageList.tail: List[2,3,4,...,N]
        // pageList.slice(0, pageList.length - 1).zip(pageList.tail): List[(1,2),(2,3),(3,4),...,(N-1,N)]
        val realPageSplit = pageList.slice(0, pageList.length - 1).zip(pageList.tail).map {
          case (page1, page2) =>
            (page1 + "_" + page2)
        }

        // 过滤:留下存在于 targetPageSplit 中的页面切片
        val realPageSplitFilter = realPageSplit.filter {
          case realPageSplit =>
            targetPageSplit.contains(realPageSplit)
        }

        realPageSplitFilter.map {
          case realPageSplitFilter =>
            (realPageSplitFilter, 1L)
        }
    }

    // 聚合
    // realPageSplitCountMap; Map[(page1_page2, count)]
    val realPageSplitCountMap = realPageSplitNumRDD.countByKey()

    realPageSplitCountMap.foreach(println(_))

    val startPage = targetPageFlowArray(0).toLong

    val startPageCount = sessionId2ActionRDD.filter {
      case (sessionId, userVisitAction) =>
        userVisitAction.page_id == startPage.toLong
    }.count()

    println("哈啊哈"+ startPageCount)

    // 得到最后的统计结果
    getPageConvertRate(sparkSession, taskUUID, targetPageSplit, startPageCount, realPageSplitCountMap)
  }

  // ******************** 需求五:页面单跳转化率统计 ********************

  /**
    * 计算页面切片转化率
    *
    * @param sparkSession
    * @param taskUUID
    * @param targetPageSplit
    * @param startPageCount
    * @param realPageSplitCountMap
    */
  def getPageConvertRate(sparkSession: SparkSession,
                         taskUUID: String,
                         targetPageSplit: Array[String],
                         startPageCount: Long,
                         realPageSplitCountMap: collection.Map[String, Long]): Unit = {
    val pageSplitRatioMap = new mutable.HashMap[String, Double]()

    var lastPageCount = startPageCount.toDouble

    // 1_2,2_3,3_4,...
    for (pageSplit <- targetPageSplit) {
      // 第一次循环:currentPageSplitCount: page1_page2   lastPageCount: page1
      // 第二次循环:currentPageSplitCount: page2_page3   lastPageCount: page1_page2
      val currentPageSplitCount = realPageSplitCountMap.get(pageSplit).get.toDouble
      val rate = currentPageSplitCount / lastPageCount
      pageSplitRatioMap.put(pageSplit, rate)
      lastPageCount = currentPageSplitCount
    }

    val convertRateStr = pageSplitRatioMap.map {
      case (pageSplit, rate) =>
        pageSplit + "=" + rate
    }.mkString("|")


    // 封装数据
    val pageSplitConvertRate = PageSplitConvertRate(taskUUID, convertRateStr)

    val pageSplitConvertRateRDD = sparkSession.sparkContext.makeRDD(Array(pageSplitConvertRate))

    // 写入到 MySQL
    import sparkSession.implicits._
    pageSplitConvertRateRDD.toDF().write
      .format("jdbc")
      .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("dbtable", "page_split_convert_rate")
      .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .mode(SaveMode.Append)
      .save()
  }

  /**
    * 根据日期范围获取对象的用户行为数据
    *
    * @param sparkSession
    * @param taskParam
    * @return
    */
  def getOriActionRDD(sparkSession: SparkSession, taskParam: JSONObject) = {
    // 先获取所用到的过滤条件:开始日期 和 结束日期
    val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
    val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)
    // 把所有的时间范围在 startDate 和 endDate 之间的数据查询出来
    val sql = "select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'"

    // 在对 DataFrame 和 Dataset 进行许多操作都需要这个包进行支持
    import sparkSession.implicits._
    sparkSession.sql(sql).as[UserVisitAction].rdd // DataFrame(Row类型) -> DataSet(样例类类型) -> rdd(样例类)
  }
}
5.5.8 需求五实现思路整理

5.6 需求六:各区域 Top3 商品统计

5.6.1 需求解析

根据用户指定的日期查询条件范围,统计各个区域下的最热门【点击】的 top3 商品,区域信息、各个城市的信息在项目中用固定值进行配置,因为不怎么变动。

代码语言:javascript
复制
1、查询 task,获取日期范围,通过 Spark SQL,查询 user_visit_action 表中的指定日期范围内的数据,过滤出商品点击行为,click_product_id is not null、click_product_id != 'NULL'、click_product_id != 'null'、city_id、click_product_id。
2、使用 Spark SQL 从 MySQL 中查询出来城市信息(city_id、city_name、area),用户访问行为数据要跟城市信息进行 join, city_id、city_name、area、product_id, 将 RDD 转换成 DataFrame,注册成一个临时表。
3、Spark SQL 内置函数(case when),对 area 打标记(华东大区,A 级;华中大区,B 级;东北大区,C 级;西北大区,D 级),area_level。
4、计算出来每个区域下每个商品的点击次数,group by area, product_id 保留每个区域的城市名称列表;自定义 UDAF,group_concat_distinct()函数,聚合出来一个 city_names 字段,area、product_id、city_names、click_count。
5、join 商品明细表,hive(product_id、product_name、extend_info),extend_info 是 json 类型;自定义 UDF,get_json_object() 函数,取出其中的 product_status 字段, if() 函数(Spark SQL 内置函数),判断: 0 自营,1 第三方(area、product_id、 city_names、click_count、product_name、product_status)。
6、开窗函数,根据 area 来聚合,获取每个 area 下,click_count 排名前 3 的 product 信息:area、area_level、product_id、city_names、click_count、product_name、product_status。
7、结果写入 MySQL 表中。
5.6.2 数据源解析

商品信息表 + 用户访问行为表

5.6.3 数据结构解析

城市信息(城市 ID,城市名称,区域名称)

代码语言:javascript
复制
Array((0L, "北京", "华北"), (1L, "上海", "华东"), (2L, "南京", "华东"), (3L, "广州", "华南"), (4L, "三亚", "华南"), (5L, "武汉", "华中"), (6L, "长沙", "华中"), (7L, "西安", "西北"), (8L, "成都", "西南"), (9L, "哈尔滨", "东北"))
5.6.4 需求实现简要流程
5.6.5 需求实现详细流程

5.6.6 MySQL 存储结构解析
代码语言:javascript
复制
-- ----------------------------
--  Table structure for `area_top3_product`
-- ----------------------------
DROP TABLE IF EXISTS `area_top3_product`; 
CREATE TABLE `area_top3_product` (
  `taskid` varchar(255) DEFAULT NULL,
  `area` varchar(255) DEFAULT NULL,
  `areaLevel` varchar(255) DEFAULT NULL,
  `productid` int(11) DEFAULT NULL,
  `cityInfos` varchar(255) DEFAULT NULL,
  `clickCount` int(11) DEFAULT NULL,
  `productName` varchar(255) DEFAULT NULL,
  `productStatus` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.6.7 代码解析

在 analyse 中新建子模块 product,配置 pom.xml 文件,添加 scala 框架的支持

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>analyse</artifactId>
        <groupId>com.atguigu</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>product</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.atguigu</groupId>
            <artifactId>commons</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <!-- Spark 的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
        </dependency>
        <!-- 引入 Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <!-- scala-maven-plugin 插件用于在任意的 maven 项目中对 scala 代码进行编译/测试/运行/文档化 -->
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.atguigu.product.AreaTop3ProductApp</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

示例代码:

代码语言:javascript
复制
package com.atguigu.product

import java.util.UUID

import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.utils.ParamUtils
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}

object AreaTop3ProductStat {

  def main(args: Array[String]): Unit = {
    // 获取过滤条件,【为了方便,直接从配置文件中获取,企业中会从一个调度平台获取】
    val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
    // 获取过滤条件对应的 JsonObject 对象
    val taskParam = JSONObject.fromObject(jsonStr)

    // 创建全局唯一的主键,每次执行 main 函数都会生成一个独一无二的 taskUUID,来区分不同任务,作为写入 MySQL 数据库中那张表的主键
    val taskUUID = UUID.randomUUID().toString

    // 创建 sparkConf
    val sparkConf = new SparkConf().setAppName("product").setMaster("local[*]")

    // 创建 sparkSession(包含 sparkContext)
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    // ******************** 需求六:各区域 Top3 商品统计 ********************

    // 获取 用户访问行为的数据 (cityId, clickProductId)
    // cityId2ProductIdRDD: RDD[(cityId, clickProductId)]
    val cityId2ProductIdRDD = getCityAndProductInfo(sparkSession, taskParam)

    // 获取 城市信息 (城市 ID,城市名称,区域名称)
    // cityId2AreaInfoRDD: RDD[(cityId, CityAreaInfo)]
    val cityId2AreaInfoRDD = getCityAreaInfo(sparkSession)

    // 做 Join 操作,得到 (city_id, city_name, area, click_product_id)
    // 临时表 temp_area_product_info 中的一条数据就代表一次点击商品的行为
    getAreaProductIdBasicInfoTable(sparkSession, cityId2ProductIdRDD, cityId2AreaInfoRDD)

    // 自定义 UDF 函数:实现字符串带去重的拼接
    sparkSession.udf.register("concat_long_string", (v1: Long, v2: String, split: String) => {
      v1 + split + v2
    })
    sparkSession.udf.register("group_concat_distinct", new GroupConcatDistinct)

    // 统计 每一个区域里每一个商品被点击的次数,得到 (area, click_product_id, click_count, city_infos)
    getAreaProductClickCountTable(sparkSession)

    // 自定义 UDAF 函数:实现从 json 串中取出指定字段的值
    sparkSession.udf.register("get_json_field", (json: String, field: String) => {
      val jsonObject = JSONObject.fromObject(json)
      jsonObject.getString(field)
    })

    // 将 temp_area_product_count 表 join 商品信息表 product_info
    getAreaProductClickCountInfo(sparkSession)

    // 获取 各区域 Top3 商品(使用到了开窗函数)
    getAreaTop3Product(sparkSession, taskUUID)

    // 测试
    // sparkSession.sql("select * from temp_area_product_info").show
    // sparkSession.sql("select * from temp_area_product_count").show
    // sparkSession.sql("select * from temp_area_count_product_info").show
    // sparkSession.sql("select * from temp_test").show
  }

  /**
    * 获取 各区域 Top3 商品(使用了开窗函数)
    *
    * @param sparkSession
    * @param taskUUID
    * @return
    */
  def getAreaTop3Product(sparkSession: SparkSession, taskUUID: String) = {
//    val sql = "select area, city_infos, click_product_id, click_count, product_name, product_status, " +
//      "row_number() over(partition by area order by click_count desc) row_number from temp_area_count_product_info"
//    sparkSession.sql(sql).createOrReplaceTempView("temp_test") // 测试

    val sql = "select area, " +
      "case " +
      "when area='华北' or area='华东' then 'A_Level' " +
      "when area='华中' or area='华男' then 'B_Level' " +
      "when area='西南' or area='西北' then 'C_Level' " +
      "else 'D_Level' " +
      "end area_level, " +
      "city_infos, click_product_id, click_count, product_name, product_status from (" +
      "select area, city_infos, click_product_id, click_count, product_name, product_status, " +
      "row_number() over(partition by area order by click_count desc) row_number from temp_area_count_product_info) t where row_number <= 3"
    val areaTop3ProductRDD = sparkSession.sql(sql).rdd.map {
      case row =>
        AreaTop3Product(taskUUID, row.getAs[String]("area"), row.getAs[String]("area_level"),
          row.getAs[Long]("click_product_id"), row.getAs[String]("city_infos"),
          row.getAs[Long]("click_count"), row.getAs[String]("product_name"),
          row.getAs[String]("product_status"))
    }

    import sparkSession.implicits._
    areaTop3ProductRDD.toDF().write
      .format("jdbc")
      .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("dbtable", "area_top3_product")
      .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .mode(SaveMode.Append)
      .save()
  }

  /**
    * 将 temp_area_product_count 表 join 商品信息表
    *
    * @param sparkSession
    */
  def getAreaProductClickCountInfo(sparkSession: SparkSession): Unit = {
    // temp_area_product_count: (area, click_product_id, click_count, city_infos) tapc
    // product_info: (product_id, product_name, extend_info)  pi
    val sql = "select tapc.area, tapc.city_infos, tapc.click_product_id, tapc.click_count, pi.product_name, " +
      "if (get_json_field(pi.extend_info, 'product_status') = '0', 'Self', 'Third Party') product_status" +
      " from temp_area_product_count tapc join product_info pi on tapc.click_product_id = pi.product_id"

    sparkSession.sql(sql).createOrReplaceTempView("temp_area_count_product_info")
  }

  /**
    * 统计 每一个区域里每一个商品被点击的次数
    */
  def getAreaProductClickCountTable(sparkSession: SparkSession): Unit = {
    val sql = "select area, click_product_id, count(*) click_count," +
      " group_concat_distinct(concat_long_string(city_id, city_name, ':')) city_infos" +
      " from temp_area_product_info group by area, click_product_id"

    sparkSession.sql(sql).createOrReplaceTempView("temp_area_product_count")
  }

  /**
    * 将 用户访问行为的数据 (cityId, clickProductId) 和 城市信息 (城市 ID,城市名称,区域名称) 做 join 操作,得到所需的临时表数据
    *
    * @param sparkSession
    * @param cityId2ProductIdRDD
    * @param cityId2AreaInfoRDD
    */
  def getAreaProductIdBasicInfoTable(sparkSession: SparkSession,
                                     cityId2ProductIdRDD: RDD[(Long, Long)],
                                     cityId2AreaInfoRDD: RDD[(Long, CityAreaInfo)]): Unit = {
    val areaProductIdBasicInfoRDD = cityId2ProductIdRDD.join(cityId2AreaInfoRDD).map {
      case (cityId, (clickProductId, cityAreaInfo)) =>
        (cityId, cityAreaInfo.city_name, cityAreaInfo.area, clickProductId)
    }

    import sparkSession.implicits._
    // 转换为临时表的时候需要指定字段的名称
    areaProductIdBasicInfoRDD.toDF("city_id", "city_name", "area", "click_product_id").createOrReplaceTempView("temp_area_product_info")
  }

  /**
    * 获取 城市信息(城市 ID,城市名称,区域名称)
    *
    * @param sparkSession
    */
  def getCityAreaInfo(sparkSession: SparkSession) = {
    val cityAreaInfoArray = Array((0L, "北京", "华北"), (1L, "上海", "华东"), (2L, "南京", "华东"),
      (3L, "广州", "华南"), (4L, "三亚", "华南"), (5L, "武汉", "华中"),
      (6L, "长沙", "华中"), (7L, "西安", "西北"), (8L, "成都", "西南"),
      (9L, "哈尔滨", "东北"))

    // RDD[(cityId, CityAreaInfo)]
    sparkSession.sparkContext.makeRDD(cityAreaInfoArray).map {
      case (cityId, cityName, area) =>
        (cityId, CityAreaInfo(cityId, cityName, area))
    }
  }

  /**
    * 获取 用户访问行为的数据 (city_id, click_product_id)
    *
    * @param sparkSession
    * @param taskParam
    * @return
    */
  def getCityAndProductInfo(sparkSession: SparkSession, taskParam: JSONObject) = {
    val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
    val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)

    // 只获取发生过点击的 action 的数据,获取到的一条 action 数据就代表一个点击行为
    val sql = "select city_id, click_product_id from user_visit_action where date>='" + startDate +
      "' and date<='" + endDate + "' and click_product_id != -1"

    import sparkSession.implicits._
    sparkSession.sql(sql).as[CityClickProduct].rdd.map {
      case cityIdAndProductId =>
        (cityIdAndProductId.city_id, cityIdAndProductId.click_product_id)
    }
  }
}

自定义弱类型的 UDAF 函数

代码语言:javascript
复制
package com.atguigu.product

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}

/**
  * 自定义弱类型的 UDAF 函数
  */
class GroupConcatDistinct extends UserDefinedAggregateFunction {
  // 设置 UDAF 函数的输入类型为 String
  override def inputSchema: StructType = StructType(StructField("cityInfoInput", StringType) :: Nil)

  // 设置 UDAF 函数的缓冲区类型为 String
  override def bufferSchema: StructType = StructType(StructField("cityInfoBuffer", StringType) :: Nil)

  // 设置 UDAF 函数的输出类型为 String
  override def dataType: DataType = StringType

  // 设置 UDAF 函数的输入数据和输出数据是一致的
  override def deterministic: Boolean = true

  // 初始化自定义的 UDAF 函数
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = ""
  }

  // 设置 UDAF 函数的缓冲区更新:实现一个字符串带去重的拼接
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    var cityInfoBuffer = buffer.getString(0)
    val cityInfoInput = input.getString(0)

    if (!cityInfoBuffer.contains(cityInfoInput)) {
      if ("".equals(cityInfoBuffer)) {
        cityInfoBuffer += cityInfoInput
      } else {
        cityInfoBuffer += "," + cityInfoInput
      }
    }

    buffer.update(0, cityInfoBuffer)
  }

  // 把两个自定义的 UDAF 函数的值合并在一起
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    // cityInfoBuffer1: cityId1:cityName1, cityId2:cityName2, cityId3:cityName3, ...
    var cityInfoBuffer1 = buffer1.getString(0)
    // cityInfoBuffer2: cityId1:cityName1, cityId2:cityName2, cityId3:cityName3, ...
    val cityInfoBuffer2 = buffer2.getString(0)

    // 将 cityInfoBuffer2 中的数据带去重的加入到 cityInfoBuffer1 中
    for (cityInfo <- cityInfoBuffer2.split(",")) {
      if (!cityInfoBuffer1.contains(cityInfo)) {
        if ("".equals(cityInfoBuffer1)) {
          cityInfoBuffer1 += cityInfo
        } else {
          cityInfoBuffer1 += "," + cityInfo
        }
      }
    }

    buffer1.update(0, cityInfoBuffer1)
  }

  // 返回结果
  override def evaluate(buffer: Row): Any = {
    buffer.getString(0)
  }
}

AreaTop3Product 类

代码语言:javascript
复制
/**
  * 各省 top3 热门广告
  *
  * @author
  *
  */
case class AdProvinceTop3(date: String,
                          province: String,
                          adid: Long,
                          clickCount: Long)
5.6.8 需求六实现思路整理

5.7 需求七:广告点击黑名单实时统计

广告流量实时统计【概览】

5.7.1 需求解析

  实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。

5.7.2 数据源解析

Kafka 数据:timestamp province city userid adid

5.7.3 数据结构解析

5.7.4 需求实现简要流程
5.7.5 需求实现详细流程

5.7.6 MySQL 存储结构解析
代码语言:javascript
复制
-- ----------------------------
--  Table structure for `ad_blacklist`
-- ----------------------------
DROP TABLE IF EXISTS `ad_blacklist`; 
CREATE TABLE `ad_blacklist` (
  `userid` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
--  Table structure for `ad_user_click_count`
-- ----------------------------
DROP TABLE IF EXISTS `ad_user_click_count`; 
CREATE TABLE `ad_user_click_count` (
  `date` varchar(30) DEFAULT NULL,
  `userid` int(11) DEFAULT NULL,
  `adid` int(11) DEFAULT NULL,
  `clickCount` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.7.7 代码解析

在 analyse 中新建子模块 advertising,配置 pom.xml 文件,添加 scala 框架的支持

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>analyse</artifactId>
        <groupId>com.atguigu</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>advertising</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.atguigu</groupId>
            <artifactId>commons</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <!-- Spark 的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
        </dependency>

        <!-- 引入 Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <!-- scala-maven-plugin 插件用于在任意的 maven 项目中对 scala 代码进行编译/测试/运行/文档化 -->
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.atguigu.stream.AdClickRealTimeStat</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

准备工作:数据生成与数据消费测试

1、先启动 zookeeper 集群,再启动 kafka 集群

代码语言:javascript
复制
[atguigu@hadoop102 ~]$ zkstart.sh
[atguigu@hadoop102 ~]$ kafka-start.sh 

2、启动一个 kafka 消费者进程

代码语言:javascript
复制
老版本:消费者会将自己的 offset 文件保存在 zookeeper(低版本的kafka)。所以消费者连接的是 zookeeper。
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic AdRealTimeLog1

新版本:消费者会将自己的 offset 文件保存在 kafka 集群中(高版本的kafka)。所以消费者连接的是 kafka。这样做的好处是:提高了效率,减少了网络传输。
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic AdRealTimeLog1

3、启动数据生成程序 MockRealTimeData

代码语言:javascript
复制
package com.atguigu.stream

import commons.conf.ConfigurationManager
import commons.constant.Constants
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object AdClickRealTimeStat {

  def main(args: Array[String]): Unit = {
    // 构建 Spark 上下文
    val sparkConf = new SparkConf().setAppName("streamingRecommendingSystem").setMaster("local[*]")

    // 创建 Spark 客户端
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(5))

    // 设置检查点目录
    ssc.checkpoint("./streaming_checkpoint")

    // 获取 kafka 的配置
    val kafka_brokers = ConfigurationManager.config.getString(Constants.KAFKA_BROKERS)
    val kafka_topics = ConfigurationManager.config.getString(Constants.KAFKA_TOPICS)

    // kafka 消费者参数配置
    val kafkaParam = Map(
      "bootstrap.servers" -> kafka_brokers, // 用于初始化连接到集群的地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "commerce-consumer-group", // 用于标识这个消费者属于哪个消费团体(组)
      "auto.offset.reset" -> "latest", // 如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性,latest 表示自动重置偏移量为最新的偏移量
      "enable.auto.commit" -> (false: java.lang.Boolean) // 如果是 true,则这个消费者的偏移量会在后台自动提交
    )

    // 创建 DStream,返回接收到的输入数据
    // LocationStrategies:                  根据给定的主题和集群地址创建 consumer
    // LocationStrategies.PreferConsistent: 持续的在所有 Executor 之间均匀分配分区(即把 Executor 当成 Kafka Consumer),常用该方式
    // ConsumerStrategies:                  选择如何在 Driver 和 Executor 上创建和配置 Kafka Consumer
    // ConsumerStrategies.Subscribe:        订阅一系列主题
    // adRealTimeLogDStream: DStream[RDD, RDD, RDD, ...] -> RDD[Message] -> Message: key value
    val adRealTimeLogDStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(kafka_topics), kafkaParam))

    // 取出 DStream 里面每一条数据的 value 值
    // adRealTimeLogDStream: DStream[RDD, RDD, RDD, ...] -> RDD[String] -> String: timestamp province city userid adid
    val adRealTimeValueDStream = adRealTimeLogDStream.map(consumerRecordRDD => consumerRecordRDD.value())

    // 用于 Kafka Stream 的线程非安全问题,重新分区切断血统
    // adRealTimeValueDStream = adRealTimeValueDStream.repartition(400)

    // 根据黑名单进行数据过滤
    // 刚刚接受到原始的用户点击行为日志之后
    // 根据 mysql 中的动态黑名单,进行实时的黑名单过滤(黑名单用户的点击行为,直接过滤掉,不要了)
    // 使用 transform 算子(将 dstream 中的每个 batch RDD 进行处理,转换为任意的其他 RDD,功能很强大)
    val adRealTimeFilterDStream = adRealTimeValueDStream.transform {
      consumerRecordRDD =>
        // 首先从 MySQL 中查询所有黑名单用户
        // adBlacklists: Array[AdBlacklist]  AdBlacklist: userId
        val adBlacklistArray = AdBlacklistDAO.findAll()

        // userIdArray: Array[Long]  [userId1, userId2, ...]
        val userIdArray = adBlacklistArray.map(item => item.userid)

        consumerRecordRDD.filter {
          // consumerRecord: timestamp province city userid adid
          case consumerRecord =>
            val consumerRecordSplit = consumerRecord.split(" ")
            val userId = consumerRecordSplit(3).toLong
            !userIdArray.contains(userId)
        }
    }

    adRealTimeFilterDStream.foreachRDD(rdd => rdd.foreach(println(_)))

    ssc.start()
    ssc.awaitTermination()
  }
}

4、启动数据消费程序 AdClickRealTimeStat

代码语言:javascript
复制
import java.util.Properties

import commons.conf.ConfigurationManager
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

object MockRealTimeData {

  def main(args: Array[String]): Unit = {
    // 获取配置文件 commerce.properties 中的 Kafka 配置参数
    val broker = ConfigurationManager.config.getString("kafka.broker.list")
    val topic = ConfigurationManager.config.getString("kafka.topics")

    // 创建 Kafka 生产者
    val kafkaProducer = createKafkaProducer(broker)

    while (true) {
      // 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中
      for (item <- generateMockData()) {
        kafkaProducer.send(new ProducerRecord[String, String](topic, item))
      }
      Thread.sleep(5000)
    }
  }

  /**
    * 实时模拟数据的生成
    *
    * 时间点: 当前时间毫秒
    * userId: 0 - 99
    * 省份、城市 ID 相同: 1 - 9
    * adid: 0 - 19
    * ((0L,"北京","北京"),(1L,"上海","上海"),(2L,"南京","江苏省"),(3L,"广州","广东省"),(4L,"三亚","海南省"),(5L,"武汉","湖北省"),(6L,"长沙","湖南省"),(7L,"西安","陕西省"),(8L,"成都","四川省"),(9L,"哈尔滨","东北省"))
    *
    * 格式 :timestamp province city userid adid
    *       某个时间点 某个省份 某个城市 某个用户 某个广告
    */
  def generateMockData(): Array[String] = {
    val array = ArrayBuffer[String]()
    val random = new Random()

    // 模拟实时数据:timestamp province city userid adid
    for (i <- 0 until 50) {
      val timestamp = System.currentTimeMillis()
      val province = random.nextInt(10)
      val city = province
      val adid = random.nextInt(20)
      val userid = random.nextInt(100)

      // 拼接实时数据
      array += timestamp + " " + province + " " + city + " " + userid + " " + adid
    }

    array.toArray
  }

  def createKafkaProducer(broker: String): KafkaProducer[String, String] = {
    // 创建配置对象
    val prop = new Properties()
    // 添加配置
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    // 根据配置创建 Kafka 生产者
    new KafkaProducer[String, String](prop)
  }
}

5、查看控制台消费者的输出 以及 Linux 上的 kafka 消费者的输出,没有问题!下面实现具体的需求。

业务代码

代码语言:javascript
复制
  /**
    * 需求七:广告点击黑名单实时统计
    *
    * @param adRealTimeFilterDStream
    */
  def generateBlackListStat(adRealTimeFilterDStream: DStream[String]) = {
    // adRealTimeFilterDStream: DStream[RDD, RDD, RDD, ...] -> RDD[String] -> String: timestamp province city userid adid
    // key2NumDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(key, 1L)]
    val key2NumDStream = adRealTimeFilterDStream.map {
      // consumerRecordRDD: timestamp province city userid adid
      case consumerRecordRDD =>
        val consumerRecordSplit = consumerRecordRDD.split(" ")
        val timestamp = consumerRecordSplit(0).toLong
        // dateKey: yyyyMMdd
        val dateKey = DateUtils.formatDateKey(new Date(timestamp))
        val userid = consumerRecordSplit(3).toLong
        val adid = consumerRecordSplit(4).toLong

        val key = dateKey + "_" + userid + "_" + adid // 组合 key

        (key, 1L)
    }

    // key2CountDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(key, 25)]
    val key2CountDStream = key2NumDStream.reduceByKey(_ + _)

    // 根据每一个 RDD 里面的数据,更新 用户点击次数表 数据
    key2CountDStream.foreachRDD {
      rdd => rdd.foreachPartition {
        items =>
          val clickCountArray = new ArrayBuffer[AdUserClickCount]()

          for ((key, count) <- items) {
            // 切割数据
            val keySplit = key.split("_")

            // 取出数据
            val date = keySplit(0)
            val userid = keySplit(1).toLong
            val adid = keySplit(2).toLong

            // 封装数据,并放入 ArrayBuffer 中
            clickCountArray += AdUserClickCount(date, userid, adid, count)
          }

          // 更新 MySQl 数据库中表的数据
          AdUserClickCountDAO.updateBatch(clickCountArray.toArray)
      }
    }

    // 对 DStream 做 filter 操作:就是遍历 DStream 中的每一个 RDD 中的每一条数据
    // key2BlackListDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(key, 150)]
    val key2BlackListDStream = key2CountDStream.filter {
      case (key, count) =>
        // 切割数据
        val keySplit = key.split("_")

        // 取出数据
        val date = keySplit(0)
        val userid = keySplit(1).toLong
        val adid = keySplit(2).toLong

        val clickCount = AdUserClickCountDAO.findClickCountByMultiKey(date, userid, adid)

        if (clickCount > 100) {
          true // 留下
        } else {
          false // 过滤掉
        }
    }

    // key2BlackListDStream.map: DStream[RDD[userid]]
    val userIdDStream = key2BlackListDStream.map {
      case (key, count) =>
        key.split("_")(1).toLong
    }.transform(rdd => rdd.distinct()) // 转换 key 并去重

    // 将结果批量插入 MySQL 数据库中
    userIdDStream.foreachRDD {
      rdd => rdd.foreachPartition {
        items =>
          val userIdArray = new ArrayBuffer[AdBlacklist]()

          for (userId <- items) {
            userIdArray += AdBlacklist(userId)
          }

          AdBlacklistDAO.insertBatch(userIdArray.toArray)
      }
    }
  }

AdBlacklist 类

代码语言:javascript
复制
/**
  * 广告黑名单
  *
  * @author
  *
  */
case class AdBlacklist(userid: Long)

/**
  * 用户广告点击量
  *
  * @author
  *
  */
case class AdUserClickCount(date: String,
                            userid: Long,
                            adid: Long,
                            clickCount: Long)
5.7.8 需求七实现思路整理

5.8 需求八:各省各城市广告点击量实时统计

5.8.1 需求解析

5.8.2 数据源解析

Kafka 数据:timestamp province city userid adid

5.8.3 数据结构解析

5.8.4 需求实现简要流程
5.8.5 需求实现详细流程

5.8.6 MySQL 存储结构解析
代码语言:javascript
复制
-- ----------------------------
--  Table structure for `ad_stat`
-- ----------------------------
DROP TABLE IF EXISTS `ad_stat`; 
CREATE TABLE `ad_stat` (
  `date` varchar(30) DEFAULT NULL,
  `province` varchar(100) DEFAULT NULL,
  `city` varchar(100) DEFAULT NULL,
  `adid` int(11) DEFAULT NULL,
  `clickCount` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.8.7 代码解析

业务代码

代码语言:javascript
复制
  /**
    * 需求八:各省各城市广告点击量实时统计(使用累计统计)
    *
    * @param adRealTimeFilterDStream
    */
  def provinceCityClickStat(adRealTimeFilterDStream: DStream[String]) = {
    // adRealTimeFilterDStream: DStream[RDD, RDD, RDD, ...] -> RDD[String] -> String: timestamp province city userid adid
    // key2NumDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(key, 1L)]
    val key2NumDStream = adRealTimeFilterDStream.map {
      case consumerRecordRDD =>
        val consumerRecordSplit = consumerRecordRDD.split(" ")
        val timestamp = consumerRecordSplit(0).toLong
        // dateKey: yyyyMMdd
        val dateKey = DateUtils.formatDateKey(new Date(timestamp))
        val province = consumerRecordSplit(1)
        val city = consumerRecordSplit(2)
        val adid = consumerRecordSplit(4)

        val key = dateKey + "_" + province + "_" + city + "_" + adid // 组合 key

        (key, 1L)
    }

    // 执行 updateStateByKey 操作(全局的累计性的操作)
    val key2StateDStream = key2NumDStream.updateStateByKey[Long] {
      (values: Seq[Long], state: Option[Long]) =>
        var newValue = 0L

        if (state.isDefined) {
          newValue = state.get
        }

        for (value <- values) {
          newValue += value
        }

        Some(newValue)
    }

    // 将结果批量插入 MySQL 数据库中
    key2StateDStream.foreachRDD {
      rdd =>
        rdd.foreachPartition {
          items =>
            val adStatArray = new ArrayBuffer[AdStat]()

            for ((key, count) <- items) {
              // 切割数据
              val keySplit = key.split("_")

              // 取出数据
              val date = keySplit(0)
              val province = keySplit(1)
              val city = keySplit(2)
              val adid = keySplit(3).toLong

              // 封装数据,并放入 ArrayBuffer 中
              adStatArray += AdStat(date, province, city, adid, count)
            }

            AdStatDAO.updateBatch(adStatArray.toArray)
        }
    }
  }

AdStat 类

代码语言:javascript
复制
/**
  * 广告实时统计
  *
  * @author
  *
  */
case class AdStat(date: String,
                  province: String,
                  city: String,
                  adid: Long,
                  clickCount: Long)
5.8.8 需求八实现思路整理

5.9 需求九:每天每个省份 Top3 热门广告

5.9.1 需求解析

5.9.2 数据源解析

数据来源于需求八 updateStateByKey 得到的 DStream。

5.9.3 数据结构解析

Dstream[(dateKey_province_city_adid, count)]

5.9.4 需求实现简要流程
5.9.5 需求实现详细流程

5.9.6 MySQL 存储结构解析
代码语言:javascript
复制
-- ----------------------------
--  Table structure for `ad_province_top3`
-- ----------------------------
DROP TABLE IF EXISTS `ad_province_top3`; 
CREATE TABLE `ad_province_top3` (
  `date` varchar(30) DEFAULT NULL,
  `province` varchar(100) DEFAULT NULL,
  `adid` int(11) DEFAULT NULL,
  `clickCount` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.9.7 代码解析

业务代码

代码语言:javascript
复制
  /**
    * 需求九:每天每个省份 Top3 热门广告
    *
    * @param spark
    * @param key2ProvinceCityCountDStream
    */
  def provinceTop3AdverStat(spark: SparkSession, key2ProvinceCityCountDStream: DStream[(String, Long)]) = {
    // key2ProvinceCityCountDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(key, count)]
    // key: date_province_city_adid

    // 转换 key
    // key2ProvinceCountDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(newKey, count)]
    // newKey: date_province_adid
    val key2ProvinceCountDStream = key2ProvinceCityCountDStream.map {
      case (key, count) =>
        val keySplit = key.split("_")
        val date = keySplit(0)
        val province = keySplit(1)
        val adid = keySplit(3)

        val newKey = date + "_" + province + "_" + adid // 组合 key

        (newKey, count)
    }

    // 聚合新 key
    val key2ProvinceAggrCountDStream = key2ProvinceCountDStream.reduceByKey(_ + _)

    val top3DStream = key2ProvinceAggrCountDStream.transform {
      rdd =>
        // rdd: RDD[(newKey, count)]  newKey: date_province_adid

        // 转化为新的 RDD
        val basicDataRDD = rdd.map {
          case (key, count) =>
            val keySplit = key.split("_")
            val date = keySplit(0)
            val province = keySplit(1)
            val adid = keySplit(2).toLong

            (date, province, adid, count)
        }

        // 将 RDD 转化为 DF
        import spark.implicits._
        basicDataRDD.toDF("date", "province", "adid", "count").createOrReplaceTempView("temp_basic_info")

        // 使用 Spark SQL 执行 SQL 语句,配合开窗函数,统计出各省份 top3 热门的广告
        val sql = "select date, province, adid, count from (" +
          "select date, province, adid, count, " +
          "row_number() over(partition by date, province order by count desc) row_number from temp_basic_info) t " +
          "where row_number <= 3"

        spark.sql(sql).rdd
    }

    top3DStream.foreachRDD {
      // rdd: RDD[Row]
      rdd =>
        rdd.foreachPartition {
          // items: Row
          items =>
            val top3Array = new ArrayBuffer[AdProvinceTop3]()

            for (item <- items) {
              // 取出数据
              val date = item.getAs[String]("date")
              val province = item.getAs[String]("province")
              val adid = item.getAs[Long]("adid")
              val count = item.getAs[Long]("count")

              // 封装数据
              top3Array += AdProvinceTop3(date, province, adid, count)
            }

            // 写入 MySQL 数据库中
            AdProvinceTop3DAO.updateBatch(top3Array.toArray)
        }
    }
  }

AdProvinceTop3 类

代码语言:javascript
复制
/**
  * 各省 top3 热门广告
  *
  * @author
  *
  */
case class AdProvinceTop3(date: String,
                          province: String,
                          adid: Long,
                          clickCount: Long)
5.9.8 需求九实现思路整理

5.10 需求十:最近一小时广告点击量实时统计

5.10.1 需求解析

  统计各广告最近 1 小时内的点击量趋势:各广告最近 1 小时内各分钟的点击量。

5.10.2 数据源解析

5.10.3 数据结构解析

5.10.4 需求实现简要流程
5.10.5 需求实现详细流程

5.10.6 MySQL 存储结构解析
代码语言:javascript
复制
-- ----------------------------
--  Table structure for `ad_click_trend`
-- ----------------------------
DROP TABLE IF EXISTS `ad_click_trend`; 
CREATE TABLE `ad_click_trend` (
  `date` varchar(30) DEFAULT NULL,
  `hour` varchar(30) DEFAULT NULL,
  `minute` varchar(30) DEFAULT NULL,
  `adid` int(11) DEFAULT NULL,
  `clickCount` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.10.7 代码解析

业务代码

代码语言:javascript
复制
  /**
    * 需求十:计算最近 1 小时滑动窗口内的广告点击趋势
    *
    * @param adRealTimeFilterDStream
    * @return
    */
  def recentHourAdverClickStat(adRealTimeFilterDStream: DStream[String]) = {
    val key2TimeMinuteDStream = adRealTimeFilterDStream.map {
      // consumerRecordRDD: timestamp province city userid adid
      case consumerRecordRDD =>
        val consumerRecordSplit = consumerRecordRDD.split(" ")
        val timestamp = consumerRecordSplit(0).toLong

        // timeMinute: yyyyMMddHHmm
        val timeMinute = DateUtils.formatTimeMinute(new Date(timestamp))
        val adid = consumerRecordSplit(4).toLong

        val key = timeMinute + "_" + adid // 组合 key

        (key, 1L)
    }

    // 第一个 Minutes 表示 窗口大小,第二个 Minutes 表示 窗口滑动步长
    val key2WindowDStream = key2TimeMinuteDStream.reduceByKeyAndWindow((a: Long, b: Long) => (a + b), Minutes(60), Minutes(1))

    key2WindowDStream.foreachRDD {
      rdd => rdd.foreachPartition {
        // items: (key, count)
        items =>
          val trendArray = ArrayBuffer[AdClickTrend]()

          for ((key, count) <- items) {
            val keySplit = key.split("_")

            // 切割数据
            // timeMinute: yyyyMMddHHmm
            val timeMinute = keySplit(0)

            // 获取数据
            val date = timeMinute.substring(0, 8)   // 包头不包尾,注意是索引
            val hour = timeMinute.substring(8, 10)  // 包头不包尾,注意是索引
            val minute = timeMinute.substring(10)   // 包头不包尾,注意是索引

            val adid = keySplit(1).toLong

            // 封装数据
            trendArray += AdClickTrend(date, hour, minute, adid, count)
          }

          // 写入 MySQL 数据库中
          AdClickTrendDAO.updateBatch(trendArray.toArray)
      }
    }
  }

AdClickTrend 类

代码语言:javascript
复制
/**
  * 广告点击趋势
  *
  * @author
  *
  */
case class AdClickTrend(date: String,
                        hour: String,
                        minute: String,
                        adid: Long,
                        clickCount: Long)
5.10.8 需求七、八、九、十实现思路整理

第6章 项目总结

  本项目通过 Spark 技术生态栈中的 Spark Core、Spark SQL 和 Spark Streaming三个技术框架,实现了对电商平台业务的离线和实时数据统计与分析,完成了包括用户访问 session 分析、页面单跳转化率统计、热门商品离线统计、广告流量实时统计 4 个业务模块的开发工作。   本项目涵盖了 Spark Core、Spark SQL 和 Spark Streaming 三个技术框架中核心的知识点与技术点,对于同学们真正的理解和掌握 Spark 技术生态栈有着良好的促进作用。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-06-07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第1章 项目概述
    • 1.1 项目简介
      • 1.2 项目目标
        • 1.3 业务需求简介
          • 1.3.1 用户访问 session 统计
          • 1.3.2 页面单跳转化率统计
          • 1.3.3 区域热门商品离线统计
          • 1.3.4 广告流量实时统计
      • 第2章 项目主体架构
        • 2.1 项目架构
          • 2.2 离线日志采集宏观流程(参考)
            • 2.3 实时日志采集宏观流程(参考)
              • 2.4 离线/实时日志采集框架
              • 第3章 模拟业务数据源
                • 3.1 离线数据
                  • 3.1.1 数据模型与数据说明
                • 3.2 实时数据
                  • 3.2.1 数据模型与数据说明
              • 第4章 程序框架解析
                • 4.1 mock 模块(模拟数据产生模块)
                  • 4.2 commons 模块(公共模块)
                    • 4.3 analyse 模块(数据分析模块)
                    • 第5章 需求解析
                      • 5.1 需求一:Session 各范围访问步长、访问时长占比统计
                        • 5.1.1 需求解析
                        • 5.1.2 数据源解析
                        • 5.1.3 数据结构解析
                        • 5.1.4 需求实现简要流程
                        • 5.1.5 需求实现详细流程
                        • 5.1.6 MySQL 存储结构解析
                        • 5.1.7 代码解析
                        • 5.1.8 需求一实现思路整理
                      • 5.2 需求二:Session 随机抽取
                        • 5.2.1 需求解析
                        • 5.2.2 数据源解析
                        • 5.2.3 数据结构解析
                        • 5.2.4 需求实现简要流程
                        • 5.2.5 需求实现详细流程
                        • 5.2.6 MySQL 存储结构解析
                        • 5.2.7 代码解析
                        • 5.2.8 需求二实现思路整理
                      • 5.3 需求三:Top10 热门品类统计
                        • 5.3.1 需求解析
                        • 5.3.2 数据源解析
                        • 5.3.3 数据结构解析
                        • 5.3.4 需求实现简要流程
                        • 5.3.5 需求实现详细流程
                        • 5.3.6 MySQL 存储结构解析
                        • 5.3.7 代码解析
                        • 5.3.8 需求三实现思路整理
                      • 5.4 需求四:Top10 热门品类的 Top10 活跃 Session 统计
                        • 5.4.1 需求解析
                        • 5.4.2 数据源解析
                        • 5.4.3 数据结构解析
                        • 5.4.4 需求实现简要流程
                        • 5.4.5 需求实现详细流程
                        • 5.4.6 MySQL 存储结构解析
                        • 5.4.7 代码解析
                        • 5.4.8 需求三、四实现思路整理
                      • 5.5 需求五:页面单跳转化率统计
                        • 5.5.1 需求解析
                        • 5.5.2 数据源解析
                        • 5.5.3 数据结构解析
                        • 5.5.4 需求实现简要流程
                        • 5.5.5 需求实现详细流程
                        • 5.5.6 MySQL 存储结构解析
                        • 5.5.7 代码解析
                        • 5.5.8 需求五实现思路整理
                      • 5.6 需求六:各区域 Top3 商品统计
                        • 5.6.1 需求解析
                        • 5.6.2 数据源解析
                        • 5.6.3 数据结构解析
                        • 5.6.4 需求实现简要流程
                        • 5.6.5 需求实现详细流程
                        • 5.6.6 MySQL 存储结构解析
                        • 5.6.7 代码解析
                        • 5.6.8 需求六实现思路整理
                      • 5.7 需求七:广告点击黑名单实时统计
                        • 5.7.1 需求解析
                        • 5.7.2 数据源解析
                        • 5.7.3 数据结构解析
                        • 5.7.4 需求实现简要流程
                        • 5.7.5 需求实现详细流程
                        • 5.7.6 MySQL 存储结构解析
                        • 5.7.7 代码解析
                        • 5.7.8 需求七实现思路整理
                      • 5.8 需求八:各省各城市广告点击量实时统计
                        • 5.8.1 需求解析
                        • 5.8.2 数据源解析
                        • 5.8.3 数据结构解析
                        • 5.8.4 需求实现简要流程
                        • 5.8.5 需求实现详细流程
                        • 5.8.6 MySQL 存储结构解析
                        • 5.8.7 代码解析
                        • 5.8.8 需求八实现思路整理
                      • 5.9 需求九:每天每个省份 Top3 热门广告
                        • 5.9.1 需求解析
                        • 5.9.2 数据源解析
                        • 5.9.3 数据结构解析
                        • 5.9.4 需求实现简要流程
                        • 5.9.5 需求实现详细流程
                        • 5.9.6 MySQL 存储结构解析
                        • 5.9.7 代码解析
                        • 5.9.8 需求九实现思路整理
                      • 5.10 需求十:最近一小时广告点击量实时统计
                        • 5.10.1 需求解析
                        • 5.10.2 数据源解析
                        • 5.10.3 数据结构解析
                        • 5.10.4 需求实现简要流程
                        • 5.10.5 需求实现详细流程
                        • 5.10.6 MySQL 存储结构解析
                        • 5.10.7 代码解析
                        • 5.10.8 需求七、八、九、十实现思路整理
                    • 第6章 项目总结
                    相关产品与服务
                    云数据库 SQL Server
                    腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档