目录
本实战项目的数据是采集自电商的用户行为数据. 主要包含用户的 4 种行为: 搜索, 点击, 下单和支付. 数据格式如下, 不同的字段使用下划线分割开_:
数据说明:
品类是指的产品的的分类, 一些电商品类分多级, 咱们的项目中品类类只有一级. 不同的公司可能对热门的定义不一样. 我们按照每个品类的 点击、下单、支付 的量来统计热门品类.
思路 1
分别统计每个品类 点击的次数
,下单的次数
和支付的次数
.
缺点: 统计 3 次, 需要启动 3 个 job, 每个 job 都有对原始数据遍历一次, 非常的耗时
思路 2
最好的办法应该是遍历一次能够计算出来上述的 3 个指标.
使用累加器可以达成我们的需求.
添加依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包插件, 否则 scala 类不会编译并打包进去 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
测试能否读到数据(写一步,测试一步,养成良好的习惯)
import org.apache.spark.{SparkConf, SparkContext}
object ProjectApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("project")
val sc = new SparkContext(conf)
//从数据把文件读出
val sourceRDD = sc.textFile("D:\\idea\\spark-knight\\spark-core-project\\input\\user_visit_action.txt")
//测试一下能否读取数据
sourceRDD.collect().foreach(println)
//关闭项目(sc)
sc.stop()
}
}
能够读到数据
封装用户行为的bean类
/**
* 用户访问动作表
*
* @param date 用户点击行为的日期
* @param user_id 用户的ID
* @param session_id Session的ID
* @param page_id 某个页面的ID
* @param action_time 动作的时间点
* @param search_keyword 用户搜索的关键词
* @param click_category_id 某一个商品品类的ID
* @param click_product_id 某一个商品的ID
* @param order_category_ids 一次订单中所有品类的ID集合
* @param order_product_ids 一次订单中所有商品的ID集合
* @param pay_category_ids 一次支付中所有品类的ID集合
* @param pay_product_ids 一次支付中所有商品的ID集合
* @param city_id 城市 id
*/
case class UserVisitAction(date: String,
user_id: Long,
session_id: String,
page_id: Long,
action_time: String,
search_keyword: String,
click_category_id: Long,
click_product_id: Long,
order_category_ids: String,
order_product_ids: String,
pay_category_ids: String,
pay_product_ids: String,
city_id: Long)
case class CategoryCountInfo(categoryId: String,
clickCount: Long,
orderCount: Long,
payCount: Long)
定义累加器
import bean.UserVisitAction
import org.apache.spark.util.AccumulatorV2
//in:UserVisitAction out:Map[(品类,"click")->count] (品类,"order") -> (品类,"pay"),->count
class CategoryAcc extends AccumulatorV2[UserVisitAction,Map[(String,String),Long]]{
private var map = Map[(String, String), Long]()
// 判断累加器是否为"零"
override def isZero: Boolean = map.isEmpty
// 复制累加器
override def copy(): AccumulatorV2[UserVisitAction, Map[(String, String), Long]] = {
val acc = new CategoryAcc
acc.map = map
acc
}
override def reset(): Unit = Map[(String, String), Long]() // 不可变集合需要赋值个新的空集合
//分区器累加
override def add(v: UserVisitAction): Unit = {
//分别计算3个指标
// 对不同的行为做不同的处理 if语句 或 模式匹配
v match {
//点击行为
case action if action.click_category_id != -1 =>
val key = (action.click_category_id.toString, "click")
// 这里其实是等价于 map = map + (.....) 不可变集合是给map赋值新的集合
map += key -> (map.getOrElse(key, 0L) + 1L)
// 下单行为 切出来的是字符串 "null", 不是空的null
case action if action.order_category_ids != "null" =>
// 切出来这次下单的多个品类
val cIds: Array[String] = action.order_category_ids.split(",")
cIds.foreach(cid => {
val key: (String, String) = (cid, "order")
map += key -> (map.getOrElse(key, 0L) + 1L)
})
// 支付行为
case action if action.pay_category_ids != "null" =>
val cIds: Array[String] = action.pay_category_ids.split(",")
cIds.foreach(cid => {
val key: (String, String) = (cid, "pay")
map += key -> (map.getOrElse(key, 0L) + 1L)
})
// 其他非正常情况, 做任何处理
case _ =>
}
}
override def merge(other: AccumulatorV2[UserVisitAction, Map[(String, String), Long]]): Unit = {
// 1.把other中的map合并到map中
// 合并map
/*other match {
case o: CategoryAcc =>
o.map.foreach {
case (key, count) =>
map += key -> (map.getOrElse(key, 0L) + count)
}
case _ =>
throw new UnsupportedOperationException
}*/
// 2.对other的map进行折叠, 把结果都折叠到map中
// 如果是可变map, 则所有的变化都是在原集合中发生变化, 最后的值可以不用再一次添加
// 如果是不变map, 则计算的结果, 必须重新赋值给原的map变量
map = other match {
case o: CategoryAcc =>
o.map.foldLeft(map) {
// case出来的任何东西都不能改, 只能读
case (map, (cidAction, count)) =>
// 对不可变来说所以这是错的. !!!!!!!!!!!
// map += cidAction -> (map.getOrElse(cidAction, 0L) + count)
// 相当于 map = map + (....)
//直接返回新的集合就可以了
map + (cidAction -> (map.getOrElse(cidAction, 0L) + count))
}
case _ =>
throw new UnsupportedOperationException
}
}
override def value: Map[(String, String), Long] = map
}
1.App补全
import bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ProjectApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("project")
val sc = new SparkContext(conf)
//从数据把文件读出
val sourceRDD = sc.textFile("D:\\idea\\spark-knight\\spark-core-project\\input\\user_visit_action.txt")
//把数据封装号(封装到样例类中)
val userVisitActionRDD: RDD[UserVisitAction] = sourceRDD.map(line => {
val fields = line.split("_")
UserVisitAction(
fields(0),
fields(1).toLong,
fields(2),
fields(3).toLong,
fields(4),
fields(5),
fields(6).toLong,
fields(7).toLong,
fields(8),
fields(9),
fields(10),
fields(11),
fields(12).toLong)
})
//需求1:
CategoryTopApp.calcCategoryTop10(sc, userVisitActionRDD)
//关闭项目(sc)
sc.stop()
}
}
2.计算Top10 热门品类的具体代码(在APP中创建)
import acc.CategoryAcc
import bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object CategoryTopApp {
def calcCategoryTop10(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction]) = {
// 使用累加器完成3个指标的累加: 点击 下单量 支付量
val acc = new CategoryAcc
sc.register(acc)
userVisitActionRDD.foreach(action => acc.add(action))
// 1. 把一个品类的三个指标封装到一个map中
val cidActionCountGrouped: Map[String, Map[(String, String), Long]] = acc.value.groupBy(_._1._1)
// 2. 把结果封装到样例类中
val categoryCountInfoArray: Array[CategoryCountInfo] = cidActionCountGrouped.map {
case (cid, map) =>
CategoryCountInfo(cid,
map.getOrElse((cid, "click"), 0L),
map.getOrElse((cid, "order"), 0L),
map.getOrElse((cid, "pay"), 0L)
)
}.toArray
// 3. 对数据进行排序取top10
val result: Array[CategoryCountInfo] = categoryCountInfoArray
.sortBy(info => (-info.clickCount, -info.orderCount, -info.payCount))
.take(10)
result.foreach(println)
}
}
3.运行结果
版权声明:
本文为《暴走大数据》整理,原作者独家授权。未经原作者允许转载追究侵权责任。
编辑|冷眼丶