前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spark数仓项目】需求二:DWD层会话分隔构建-高德地图API解析经纬度位置

【Spark数仓项目】需求二:DWD层会话分隔构建-高德地图API解析经纬度位置

作者头像
火之高兴
发布2024-07-25 15:48:15
630
发布2024-07-25 15:48:15
举报
文章被收录于专栏:大数据应用技术

写在前面

本项目需求运行在Hadoop10单机环境:

  • Spark3.2.0
  • Flink1.13.6
  • Hadoop3.1.4
  • jdk1.8
  • Sqoop1.4.6
  • MySQL5.7
  • Hive3.1.2
  • Kafka0.11
  • Flume1.9.0
  • Zookeeper3.4.6
  • Hbase2.4
  • Redis6.2.0
  • Dlink0.7.3

Windows11 开发环境:

  • Idea 2020
  • Moba
  • DBeaver7.0.0
  • Scala2.12.17

一、Session会话分隔切割

1.1 会话分隔是为了什么?好处

由需求一清洗后的临时表中可得会话id和会话的时间戳,我们需要将单一设备的会话进一步细化分隔为新的会话。如上图查询结果所示。

Chatgpt: 将用户行为数据的会话分隔成细粒度的时间片可以带来以下好处:

  • 分析用户行为模式:细粒度的时间片可以更好地了解用户在不同时间段内的行为模式和趋势。通过分析用户在不同时间片内的行为,可以发现用户活动的高峰期、低谷期和变化趋势,为业务决策提供更准确的数据支持。
  • 个性化推荐和营销:通过了解用户在不同时间片内的兴趣和需求变化,可以为用户提供更加个性化的推荐内容和营销策略。例如,根据用户在特定时间片内的购买习惯推荐相似商品,或者在用户活跃时间段展示相关的促销活动。
  • 用户行为分析和异常检测:细粒度的时间片可以帮助进行更精细的用户行为分析和异常检测。通过比较用户在不同时间片内的行为特征,可以发现异常行为,如频繁登录、异常购买行为等,及时采取措施防范风险。
  • 优化产品和服务策略:通过时间片分析,可以了解用户在不同时间段内对产品和服务的使用情况。这有助于优化产品的功能、性能和用户体验,以满足用户在不同时间片的需求和期望。

综上所述,将用户行为数据的会话分隔成细粒度的时间片可以提供更详细、准确的用户行为分析和个性化服务,为企业决策和用户体验提供更有价值的数据支持。

1.2 分隔测试SQL代码Demo

该案例是本章节分隔会话需求的拆解测试。 需求是:会话差超过4,定义为一个新的会话。

代码语言:javascript
复制
* sesssionid  newsessionid
 *   abc         abc-0
 *   abc         abc-0
 *   abc         abc-1
 *   abc         abc-1
 *   abc         abc-2
 *   qwe         qwe-0
 *   qwe         qwe-0
 *   qwe         qwe-1
1.2.1 HSQL建表语句
代码语言:javascript
复制
create table tmp.test1(
  sesssionid  string,
  ts          int
)
代码语言:javascript
复制
insert into tmp.test1 
values('abc',2),('abc',3),('abc',8),('abc',10),('abc',18),
      ('qwe',2),('qwe',3),('qwe',9)
代码语言:javascript
复制
select * from tmp.test1
代码语言:javascript
复制
-- 开启本地模式
set hive.exec.mode.local.auto=true;

select sesssionid ,ts,CONCAT(sesssionid,'-',new_row) AS newsessionid
FROM (
	SELECT sesssionid ,ts,before_sid,diff,flag
			,sum(flag) over(PARTITION by sesssionid order by ts) new_row
	from (
		select sesssionid ,ts,before_sid,diff,if(diff>4,1,0) as flag
		from (
			select sesssionid ,ts,before_sid,diff
			from (
				select 	sesssionid ,ts,before_sid,ts-before_sid as diff
				from (
					 select sesssionid ,ts
					 		,lag(ts,1,ts) over (PARTITION by sesssionid order by ts ) as before_sid
			
					 from tmp.test1 
				) t1
			)t2
		)t3
	)t4
)T5	
1.2.2 测试Demo查询结果
在这里插入图片描述
在这里插入图片描述

总结一下上述代码,需要用lag开窗计算会话的时间差,按照会话分隔的粒度计算出一个差值flag标志,flag是为了判断当前会话是否为重新开始的新会话。关键是掌握sum开窗函数的几种用法,sum可以按照给定分区进行累加,也可以进行分区内按照flag分别累加。具体步骤请按照上述HSQL中子查询一步一步验证该思路。

在这里插入图片描述
在这里插入图片描述

以上测试DEMO即为本章节DWD层需求,在开始DWD层会话分割前,先完成该测试,即可应用该思路在项目需求。

1.3 开始分隔Session前的建表工作

首先是创建tmp临时表event_log_splited,因为后续需求仍然需要不断完善。

代码语言:javascript
复制
create table tmp.event_log_splited(
  account           string,
  appid             string,
  appversion        string,
  carrier           string,
  deviceid          string,
  devicetype        string,
  eventid           string,
  ip                string,
  latitude          double,
  longitude         double,
  nettype           string,
  osname            string,
  osversion         string,
  properties        Map<String,String>,
  releasechannel    string,
  resolution        string,  
  sessionid         string,
  `timestamp`       bigint,
  newsessionid      string
)
partitioned by(dt string)
STORED as ORC
TBLPROPERTIES ('orc.compress'='SNAPPY')

用于测试的语句:

代码语言:javascript
复制
select * from tmp.event_log_splited  where dt = '2023-06-22'

alter table tmp.event_log_splited drop partition(dt='2023-06-22')

1.4 编写Spark程序代码

1.4.1 Local测试

以下scala代码完成了从tmp.event_log_washed中的sessionid到 tmp.event_log_splited表中的newsessionid的需求。需求思路即为1.2小节内容。其中工具类已在本项目需求一中给出。

代码语言:javascript
复制
package com.yh.ods_etl

import com.yh.utils.SparkUtils

object AppLogSessionSplit_02 {

  def main(args: Array[String]): Unit = {
    if(args.length == 0){
      println("缺失参数")
      System.exit(0)
    }

    val spark = SparkUtils.getSparkSession("AppLogSessionSplit_02")

    val dt = args(0)

    spark.sql(
      s"""
        |
        |insert overwrite table tmp.event_log_splited
        |partition(dt='${dt}')
        |select
        |   account
        |   ,appid
        |   ,appversion
        |   ,carrier
        |   ,deviceid
        |   ,devicetype
        |   ,eventid
        |   ,ip
        |   ,latitude
        |   ,longitude
        |   ,nettype
        |   ,osname
        |   ,osversion
        |   ,properties
        |   ,releasechannel
        |   ,resolution
        |   ,sessionid
        |   ,`timestamp`
        |   ,concat(sessionid,'-',sum(c1) over(partition by sessionid order by `timestamp`)) newsessionid
        |from(
        |	select
        |     account
        |     ,appid
        |     ,appversion
        |     ,carrier
        |     ,deviceid
        |     ,devicetype
        |     ,eventid
        |     ,ip
        |     ,latitude
        |     ,longitude
        |     ,nettype
        |     ,osname
        |     ,osversion
        |     ,properties
        |     ,releasechannel
        |     ,resolution
        |     ,sessionid
        |     ,`timestamp`
        |	    ,if( (`timestamp`-lag(`timestamp`,1,`timestamp`) over(partition by sessionid order by `timestamp`))/1000/60 > 10 ,1,0) c1
        |	from tmp.event_log_washed where dt = '${dt}'
        |)t2
        |
        |""".stripMargin)

    spark.stop()
  }

}

Local本地测试运行成功hive截图:

1.4.2 提交到Yarn服务器运行

编写一个shell03.DataSplit.sh

代码语言:javascript
复制
#! /bin/bash


dt=$1
if [ "x"$1 == "x" ]
then
  dt=$(date -d "1 days ago" +%Y-%m-%d)
fi

echo " 执行日期 ---------- $dt --------------- "


spark-submit                           \
--master yarn                          \
--class com.yh.ods_etl.AppLogSessionSplit_02   \
--conf spark.yarn.jars=local:/opt/installs/spark3.2.0/jars/*   \
--driver-memory   1G                   \
--driver-cores 	  2                    \
--executor-memory 2G                   \
--num-executors   3                    \
--executor-cores  2                    \
--queue           abc                  \
--jars /opt/app/spark-dw-jar-with-dependencies.jar      \
/opt/app/spark-dw.jar $dt

相比较于需求一种的两个提交Shell,本次的shell只是更换了全类名和新上传的依赖jar包,需要注意是否有新的依赖加入。运行shell如下截图:

在这里插入图片描述
在这里插入图片描述

二、高德地图webAPI解析经纬度位置应用

2.1 为什么要解析?

在这里插入图片描述
在这里插入图片描述

在我们前面需求处理的数据中存储的位置信息是经纬度,现在我们需要获取具体到省市区县的数据,因此我们就需要调用某地图的api来帮助我们解析字段。

2.2 某德地图api获取

在这里插入图片描述
在这里插入图片描述

获取api方式也较为简单,在高德开放平台申请即可,官方文档有详细的使用说明。

2.3 Json工具类hutool依赖 由于高德的返回位置信息是json,所以我们使用了hutool工具类解析json,请加入相关版本的依赖在pom文件。

代码语言:javascript
复制
    <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.20</version>
        </dependency>

2.3 建库建表

以下是dwd层数据库的创建,以及dwd.event_log_detail表的创建,数据将从前一章节的临时表中插入。

代码语言:javascript
复制
create database dwd;
create table dwd.event_log_detail(
  account           string,
  appid             string,
  appversion        string,
  carrier           string,
  deviceid          string,
  devicetype        string,
  eventid           string,
  ip                string,
  latitude          double,
  longitude         double,
  nettype           string,
  osname            string,
  osversion         string,
  properties        Map<String,String>,
  releasechannel    string,
  resolution        string,  
  sessionid         string,
  `timestamp`       bigint,
  newsessionid      string,
  province          string,   
  city              string,   
  district          string
)
partitioned by(dt string)     
STORED as orc 
TBLPROPERTIES ('orc.compress'='SNAPPY')
      
select count(*)
from tmp.event_log_splited where dt = '2023-06-22' and account = '毕导'
      
alter table dwd.event_log_detail drop partition(dt='2023-06-22')

select *
from dwd.event_log_detail

2.4 编写高德API解析位置代码

这里自定义了udf函数,将api返回字段解析到表格字段中,并且,每一次省市县的解析只调用一次api的解析查询。我们暂时先指定一位叫毕导的用户测试。

代码语言:javascript
复制
package com.yh.ods_etl

import cn.hutool.json.{JSONObject, JSONUtil}
import com.yh.utils.SparkUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.DataFrame
import scalaj.http.{Http, HttpRequest}

object AppLogToDWD_03 {

  def main(args: Array[String]): Unit = {
    if(args.length == 0){
      println("缺失参数")
      System.exit(0)
    }

    val spark = SparkUtils.getSparkSession("AppLogToDWD_03")
    spark.sparkContext.setLogLevel("WARN")

    val dt = args(0)

    spark.udf.register("parse_city",(latitude:Double,longitude:Double) => {
      val request: HttpRequest = Http("https://restapi.amap.com/v3/geocode/regeo")
        .param("key", "1ee98f687f76d5428f279cd0dbc5ad85")
        .param("location", longitude+","+latitude)

      val str: String = request.asString.body
      print(str)

      if(!StringUtils.isBlank(str)){
        val jSONObject: JSONObject = JSONUtil.parseObj(str)
        val jSONObject2: JSONObject = jSONObject.getJSONObject("regeocode").getJSONObject("addressComponent")
        val province = jSONObject2.getStr("province")
        val city = jSONObject2.getStr("city").replace("\\[]","")
        val district = jSONObject2.getStr("district")
        province+","+city+","+district
      }else{
        "null,null,null"
      }
    })



    val df: DataFrame = spark.sql(
      s"""
         |	select
         |	   *
         |	  ,split(parse_city(round(latitude,6),round(longitude,6)),',') c1
         |	from tmp.event_log_splited where dt = '${dt}' and account = '毕导'
         |
         |""".stripMargin)

    df.cache()
    df.createTempView("t2")

    spark.sql(
      s"""
        |
        |insert overwrite table dwd.event_log_detail
        |partition(dt='${dt}')
        |select
        |  account
        |  ,appid
        |  ,appversion
        |  ,carrier
        |  ,deviceid
        |  ,devicetype
        |  ,eventid
        |  ,ip
        |  ,latitude
        |  ,longitude
        |  ,nettype
        |  ,osname
        |  ,osversion
        |  ,properties
        |  ,releasechannel
        |  ,resolution
        |  ,sessionid
        |  ,`timestamp`
        |  ,newsessionid
        |  ,c1[0]  prov
        |  ,c1[1]  city
        |  ,c1[2]  district
        |from t2
        |
        |""".stripMargin)

    spark.stop()
  }
}

本地测试运行界面:

hive中查询dwd层:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-06-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写在前面
  • 一、Session会话分隔切割
    • 1.1 会话分隔是为了什么?好处
      • 1.2 分隔测试SQL代码Demo
        • 1.2.1 HSQL建表语句
        • 1.2.2 测试Demo查询结果
      • 1.3 开始分隔Session前的建表工作
        • 1.4 编写Spark程序代码
          • 1.4.1 Local测试
          • 1.4.2 提交到Yarn服务器运行
      • 二、高德地图webAPI解析经纬度位置应用
        • 2.1 为什么要解析?
          • 2.2 某德地图api获取
            • 2.3 建库建表
              • 2.4 编写高德API解析位置代码
              相关产品与服务
              腾讯云服务器利旧
              云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档