前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)

Spark Core项目实战(1) | 准备数据与计算Top10 热门品类(附完整项目代码及注释)

作者头像
不温卜火
发布2020-10-28 16:56:25
9450
发布2020-10-28 16:56:25
举报
文章被收录于专栏:不温卜火

一. 准备数据

  本实战项目的数据是采集自电商的用户行为数据.   主要包含用户的 4 种行为: 搜索, 点击, 下单和支付.   数据格式如下, 不同的字段使用下划线分割开_:

  • 1. 数据
3
3
  • 2. 数据说明
  1. 数据采用_分割字段
  2. 每一行表示用户的一个行为, 所以每一行只能是四种行为中的一种.
  3. 如果搜索关键字是 null, 表示这次不是搜索
  4. 如果点击的品类 id 和产品 id 是 -1 表示这次不是点击
  5. 下单行为来说一次可以下单多个产品, 所以品类 id 和产品 id 都是多个, id 之间使用逗号,分割. 如果本次不是下单行为, 则他们相关数据用null来表示
  6. 支付行为和下单行为类似.

二. Top10 热门品类

1. 简介

  品类是指的产品的的分类, 一些电商品类分多级, 咱们的项目中品类类只有一级. 不同的公司可能对热门的定义不一样. 我们按照每个品类的 点击、下单、支付 的量来统计热门品类.

2. 思路

  • 1.思路 1

  分别统计每个品类点击的次数, 下单的次数支付的次数.   缺点: 统计 3 次, 需要启动 3 个 job, 每个 job 都有对原始数据遍历一次, 非常的耗时

  • 2. 思路 2

  最好的办法应该是遍历一次能够计算出来上述的 3 个指标.   使用累加器可以达成我们的需求.

  1. 遍历全部日志数据, 根据品类 id 和操作类型分别累加. 需要用到累加器
  2. 定义累加器
  3. 当碰到订单和支付业务的时候注意拆分字段才能得到品类 id
  4. 遍历完成之后就得到每个每个品类 id 和操作类型的数量.
  5. 按照点击下单支付的顺序来排序
  6. 取出 Top10
4
4

三. 具体实现

1. 前提准备

  • 1.创建新module
5
5
  • 2.添加依赖
代码语言:javascript
复制
    <artifactId>spark-core-project</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>

    </dependencies>
    
    <build>
        <plugins>
            <!-- 打包插件, 否则 scala 类不会编译并打包进去 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • 3. 新建package
6
6

2.建立项目APP

  • 1. 测试看是否能够读取数据
代码语言:javascript
复制
package com.buwenbuhuo.spark.core.project

import org.apache.spark.{SparkConf, SparkContext}

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-07-28 14:20
 **
 *         MyCSDN :https://buwenbuhuo.blog.csdn.net/
 */
object ProjectApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ProjectAPP").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 把数据从文件读出来
    val sourceRDD = sc.textFile("D:/user_visit_action.txt")

    // 把数据封装好(封装到样例类中)
     sourceRDD.collect.foreach(println)

    // 需求1:


    // 关闭项目(sc)
    sc.stop()
    
  }

}
  • 运行成功结果
7
7

3. 完整项目代码

  • 1. 封装用户行为的bean类UserVisitAction
代码语言:javascript
复制
package com.buwenbuhuo.spark.core.project.bean

/**
 **
@author 不温卜火
 **
 * @create 2020-07-28 15:25
 **
 *         MyCSDN :https://buwenbuhuo.blog.csdn.net/
 */
/**
 * 用户访问动作表
 *
 * @param date               用户点击行为的日期
 * @param user_id            用户的ID
 * @param session_id         Session的ID
 * @param page_id            某个页面的ID
 * @param action_time        动作的时间点
 * @param search_keyword     用户搜索的关键词
 * @param click_category_id  某一个商品品类的ID
 * @param click_product_id   某一个商品的ID
 * @param order_category_ids 一次订单中所有品类的ID集合
 * @param order_product_ids  一次订单中所有商品的ID集合
 * @param pay_category_ids   一次支付中所有品类的ID集合
 * @param pay_product_ids    一次支付中所有商品的ID集合
 * @param city_id            城市 id
 */
case class UserVisitAction(date: String,
                           user_id: Long,
                           session_id: String,
                           page_id: Long,
                           action_time: String,
                           search_keyword: String,
                           click_category_id: Long,
                           click_product_id: Long,
                           order_category_ids: String,
                           order_product_ids: String,
                           pay_category_ids: String,
                           pay_product_ids: String,
                           city_id: Long)
// 封装最终写入到数据库的数据
case class CategorySession(categoryId: String,
                           sessionId: String,
                           clickCount: Long)
  • 2. 定义用到的累加器
代码语言:javascript
复制
package com.buwenbuhuo.spark.core.project.acc

import com.buwenbuhuo.spark.core.project.bean.UserVisitAction
import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

/**
  **
  * @author 不温卜火
  *         *
  * @create 2020-07-29 12:16
  **
  *         MyCSDN :https://buwenbuhuo.blog.csdn.net/
  */
// in: UserVisitAction out : Map[(种类,“click”) -> count] (品类,"order") -> (品类,"pay") ,-> count
class CategoryAcc extends AccumulatorV2[UserVisitAction,mutable.Map[(String, String), Long]]{
  self =>   //自身类型
  private val map: mutable.Map[(String, String), Long] = mutable.Map[(String, String), Long]()
  // 判断累加器是否为“零”
  override def isZero: Boolean = map.isEmpty

  // 复制累加器
  override def copy(): AccumulatorV2[UserVisitAction, mutable.Map[(String, String), Long]] = {
    val acc: CategoryAcc = new CategoryAcc
    map.synchronized{
      acc.map ++= map    // 可变集合,不应该直接赋值,应该进行数据的复制
    }
    acc

  }

  // 重置累加器 这个方法调用完之后,isZero必须返回ture
  override def reset(): Unit = map.clear()  // 可变集合应该做一个清楚

  // 分区内累加
  override def add(v: UserVisitAction): Unit = {
    // 分别计算3个指标
    // 对不同的行为做不同的处理 if语句  或 匹配模式
    v match {
      // 点击行为
      case action if action.click_category_id != -1 =>
        // (cid,"click") -> 100
        val key:(String,String) = (action.click_category_id.toString, "click")
        map += key -> (map.getOrElse(key,0L) + 1L)

      // 下单行为  切出来的是字符串"null",不是空null
      case action if action.order_category_ids != "null" =>
        // 切出来这次下单的多个品类
        val cIds: Array[String] = action.order_category_ids.split(",")
        cIds.foreach(cid => {
          val key:(String,String) = (cid,"order")
          map += key -> (map.getOrElse(key,0L) + 1L)
        })

      // 支付行为
      case action if action.pay_category_ids != "null" =>
        // 切出来这次下单的多个品类
        val cIds: Array[String] = action.pay_category_ids.split(",")
        cIds.foreach(cid => {
          val key:(String,String) = (cid,"pay")
          map += key -> (map.getOrElse(key,0L) + 1L)
        })

      // 其他非正常情况,做任何处理
      case  _ =>
    }

  }


  // 分区间的合并
  override def merge(other: AccumulatorV2[UserVisitAction, mutable.Map[(String, String), Long]]): Unit = {
    // 把other中的map合并到this(self)的map中
    // 合并map

    other match {
      case o: CategoryAcc =>
        // 1. 遍历 other的map,然后把变量的导致和self的mao进行相加
        /*      o.map.foreach{
                case ((cid,action),count) =>
                  self.map += (cid,action) -> (self.map.getOrElse((cid,action),0L) + count)
              }*/

        // 2, 对other的map进行折叠,把结果都折叠到self的map中
        //  如果是可变map,则所有的变化都是在原集合中发生变化,最后的值可以不用再一次添加
        //  如果是不可变map,则计算的结果,必须重新赋值给原来的map变量
        self.map ++= o.map.foldLeft(self.map){
          case (map,(cidAction,count)) =>
            map += cidAction -> (map.getOrElse(cidAction,0L) + count)
            map
        }

      case  _=>
        throw new UnsupportedOperationException
    }


  }

  // 最终的返回值
  override def value: mutable.Map[(String, String), Long] = map


}
  • 3. 首页APP
代码语言:javascript
复制
package com.buwenbuhuo.spark.core.project.app

import com.buwenbuhuo.spark.core.project.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  **
*@author 不温卜火
  **
  * @create 2020-07-29 12:18
  **
  *         MyCSDN :https://buwenbuhuo.blog.csdn.net/
  */
object ProjectApp {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("ProjectAPP").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)

    // 把数据从文件读出来
    val sourceRDD: RDD[String] = sc.textFile("D:/user_visit_action.txt")

    // 把数据封装好(封装到样例类中)
//    sourceRDD.collect.foreach(println)
    val userVisitActionRDD: RDD[UserVisitAction] = sourceRDD.map(line => {
    val fields: Array[String] = line.split("_")
      UserVisitAction(
        fields(0),
        fields(1).toLong,
        fields(2),
        fields(3).toLong,
        fields(4),
        fields(5),
        fields(6).toLong,
        fields(7).toLong,
        fields(8),
        fields(9),
        fields(10),
        fields(11),
        fields(12).toLong)
    })

        // 需求1:
         CategoryTopApp.calcCatgoryTop10(sc , userVisitActionRDD)



    // 关闭项目(sc)
    sc.stop()

  }

}
  • 4. 计算Top10 热门品类的具体代码
代码语言:javascript
复制
package com.buwenbuhuo.spark.core.project.app

import com.buwenbuhuo.spark.core.project.acc.CategoryAcc
import com.buwenbuhuo.spark.core.project.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import scala.collection.mutable

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-07-29 13:21
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object CategoryTopApp {
  def calcCatgoryTop10(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction]): List[CategoryCountInfo] =  {
      // 使用累加器完成3个指标的累加: 点击 下单量 支付量
      val acc: CategoryAcc = new CategoryAcc
      sc.register(acc)
      userVisitActionRDD.foreach(action => acc.add(action))

      // 1. 把一个品类的三个指标封装到一个map中
      val cidActionCountGrouped: Map[String, mutable.Map[(String, String), Long]] = acc.value.groupBy(_._1._1)

      // 2. 转换成 CategoryCountInfo 类型的集合, 方便后续处理
      val categoryCountInfoArray: List[CategoryCountInfo] = cidActionCountGrouped.map {
        case (cid, map) =>
          CategoryCountInfo(cid,
            map.getOrElse((cid, "click"), 0L),
            map.getOrElse((cid, "order"), 0L),
            map.getOrElse((cid, "pay"), 0L)
          )
      }.toList

      // 3. 对数据进行排序取top10
      val result: List[CategoryCountInfo] = categoryCountInfoArray.sortBy(info => (-info.clickCount, -info.orderCount, -info.payCount))
        .take(10)

      // 4. 返回top10品类
      result

    }
}
/*
利用累加器完成
 */
  • 5. 运行结果
8
8

  本次的分享就到这里了

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/08/03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. 准备数据
  • 二. Top10 热门品类
    • 1. 简介
      • 2. 思路
      • 三. 具体实现
        • 1. 前提准备
          • 2.建立项目APP
            • 3. 完整项目代码
            相关产品与服务
            智能数据分析
            腾讯云智能数据分析 Intellectual Data Analysis 是新一代云原生大数据敏捷分析解决方案。产品具备存算分离、动态扩缩容等特点,并内置事件、转化、留存、行为路径等成熟分析模型,提供高可用、低成本的全场景敏捷分析服务,可同时满足数据分析师、数据开发工程师和业务决策人的关键分析需求,帮助企业大幅降低数据分析成本,支撑业务更高效决策。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档