大家好,又见面了,我是你们的朋友全栈君。
数仓的分层也是一样,每一层都有自己的职责,同时都是基于下一层或者下面多层做数据处理之后的结果. 这样一来,最上层就是ADS,数据应用层,当更上层需要数据时,不需要再从最底层进行数据计算,可以复用中间层级的现有结果,可以提升数据处理速度. 同样的,因为更上层数据都是从下一层或者下面多层数据处理而来,这样就算下层数据丢失,也不会造成企业所有数据毁灭性灾难,算是一种数据冗余机制,不过更上层数据一般做了数据处理,提升了维度信息.
注意,数据估算最好结合公司实际情况,如果已经运行一段,可以让运维同事帮忙做估算 因为数据本身可以做压缩,数仓数据还需要做分层,数据本身存储时还会有备份机制(HDFS\Kafka等框架) 数据还会源源不断增长,同时磁盘还需要预留一定缓冲空间,一般是30%缓冲空间.所以除非是新建项目或者遇到超快速增长的公司,一般的大数据容量评估都是按照最高上限做半年甚至一年总容量做评估的. 注意,在服务器领域,磁盘的成本相对CPU\内存来说,成本是相对最低的,甚至有专门的存储服务器,如24硬盘位,甚至48硬盘位的服务器. 而2020年先在已经开发出了单磁盘12TB甚至14TB的企业硬盘,这意味着单节点机器容量上限进一步提升,存储成本也随着技术提升,成本开始降低下来.
注意,有的公司ODS层不会做太多数据过滤处理,会放到DWD层来处理. 有的公司会在一开始时就在ODS层做数据相对精细化的过滤.这个并没有明确规定,看每个公司自己的想法和技术规范
hive的外部表,对应的是业务表; hive外部表,存放数据的文件可以不是在hive的hdfs默认的位置,并且hive对应的表删除时,相应的数据文件并不会被删除.这样对于企业开发来说,可以防止因为删除表的操作而把宝贵的数据删除掉 hive的业务表,则相反.数据文件存放在hive对应的默认位置,表删除时,对应文件也会被删除掉.
大数据开发,使用hive时,一般都是使用外部表
create external table xxx(
)
实际企业开发,并没有定法,有些公司的ODS层数据,采取压缩方式存放,如parquet这类列式存储,带索引,带压缩的文件格式.这样既可以降低存储压力,也能提升查询效率,还有很好的框架兼容性
drop table app_event_log;
create external table ods.app_event_log
(
account string,
appId string,
appVersion string,
carrier string,
deviceId string,
deviceType string,
eventId string,
ip string,
latitude double,
longitude double,
netType string,
osName string,
osVersion string,
properties map<string,string>,
releaseChannel string,
resolution string,
sessionId string,
`timeStamp` bigint
)
partitioned by (y string,m string,d string)
row format serde 'org.openx.data.jsonserde.JsonSerDe'
stored as textfile
;
不过实际企业开发中,越来越少企业使用HBase进行ODS数据存储. 一个是rowkey限制维度信息, 第二个是SQL支持不好,虽然有Phoenix,但是对比Hive还是有不足 而且使用Hive管理数据,后续使用Kylin,presto还有很好的兼容性
维度退化,其实从代码角度来说,就是当一个代码写死之后,失去了灵活性,维度就退化了. 在数仓理论中,有几个经典思想,一个是去除数据冗余.所以一般会把维度信息单独存放,其他表要使用时,记录对应维度的id即可. 这样,就算维度表中数据发生了变化,其他表数据因为只是记录了id,不会有影响. 同时,维度信息放在一张表中存放,而不是每个表中存储一份,将来需要调整,只需要做一次工作即可,降低了数据冗余. 这一点和代码的实现和设计思想是一致的,不要重复造轮子.
这样带来好处就是,后续业务处理时如果需要这些信息,直接使用即可.不过会一定程度增加数据量,但一般都还可以接收,增加并不多. 注意,数据映射一般只映射常见指标以及明确的企业开发中后续会用到的指标,因为数据量较大,如映射的指标后续用不到,只会平白增加开发,维护成本.
这是为每一个用户生成一个全局唯一标识的过程.主要是将匿名访问用户绑定到一个全局唯一id上 合适的用户标识对于提高用户行为分析准确性有很大的影响,这是DWD层最关键的一个技术设计点 这对于漏斗分析,留存,session等关键指标准确性至关重要
在采集到的数据中,可以使用app端的deviceid,userid,可以使用web端的cookieid,ip,userid,可以使用小程序数据中的openid,userid 而实际生活中,用户可能有很复杂的使用状态,而产生的数据需要尽可能覆盖多种情况,这样可以让结果尽量贴近真实情况. 登录状态访问app 匿名状态访问app 登录状态访问web 匿名状态访问web 登录状态访问wx小程序 匿名状态访问wx小程序 一个用户可能拥有不止一台终端设备 一台终端设备上可能有多个用户使用 一个用户可能一段时间后更换手机
方案1:
只使用设备 ID 适合没有用户注册体系,或者极少数用户会进行多设备登录的产品,如工具类产品、搜索引擎、部分小型电商等。 这也是绝大多数数据分析产品唯一提供的方案。 不足点: 同一用户在不同设备使用会被认为不同的用户,对后续的分析统计有影响。 不同用户在相同设备使用会被认为是一个用户,也对后续的分析统计有影响。 但如果用户跨设备使用或者多用户共用设备不是产品的常见场景的话,可以忽略上述问题
方案2:
关联设备 ID 和登录 ID(一对一) 适合场景,成功关联设备 ID 和登录 ID 之后,用户在该设备 ID 上或该登录 ID 下的行为就会贯通,被认为是一个 全局 ID 发生的。在进行事件、漏斗、留存等用户相关分析时也会算作一个用户。 关联设备 ID 和登录 ID 的方法虽然实现了更准确的用户追踪,但是也会增加复杂度。 所以一般来说,我们建议只有当同时满足以下条件时,才考虑进行 ID 关联: 需要贯通一个用户在一个设备上注册前后的行为。 需要贯通一个注册用户在不同设备上登录之后的行为 不足点: 一个设备 ID 只能和一个登录 ID 关联,而事实上一台设备可能有多个用户使用。 一个登录 ID 只能和一个设备 ID 关联,而事实上一个用户可能用一个登录 ID 在多台设备上登录
方案3:
一个用户在多个设备上进行登录是一种比较常见的场景,比如 Web 端和 App 端可能都需要进行登录。支持一个登录 ID 下关联多设备 ID 之后,用户在多设备下的行为就会贯通,被认为是一个ID 发生的。 不足点: 一个设备 ID 只能和一个登录 ID 关联,而事实上一台设备可能有多个用户使用。 一个设备 ID 一旦跟某个登录 ID 关联或者一个登录 ID 和一个设备 ID 关联,就不能解除(自动解除)。 而事实上,设备 ID 和登录 ID 的动态关联才应该是更合理的
方案4:
关联设备 ID 和登录 ID(动态修正) 基本原则,与方案3相同 修正之处,一个设备ID被绑定到某个登陆ID(A)之后,如果该设备在后续一段时间(比如一个月内)被一个新的登陆ID(B)更频繁使用,则该设备ID会被调整至绑定登陆ID(B) 核心点在于同一台设备不同登录账号,根据一定规则打分,设备关联哪个账号根据得分最高者来决定. 一般是登陆次数多的得分高,但如果此前登录次数多,但类似卖出手机场景,新用户新账号登录次数变多,则旧帐号会持续扣分,新账号会持续加分,最后新账号得分高出. 扣分规则一般是乘以一个负数,这样降低梯度对比减法会更快.
实现案例:
object IdBind {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val spark = SparkSession.builder()
.config("spark.sql.shuffle.partitions","2")
.enableHiveSupport() // 开启hive整合支持(同时,需要引入spark-hive的依赖;引入hadoop和hive的配置文件)
.appName("演示")
.master("local")
.getOrCreate()
// 加载T日日志数据
val logDf = spark.read.table("ods.app_action_log").where("dt='2020-10-07'")
logDf.createTempView("logdf")
// 计算T日的 设备->账号 绑定得分
val loginCnts = spark.sql(
""" | |select |deviceid, |if(account is null or trim(account)='',null,account) as account, |-- count(distinct sessionid) as login_cnt, |min(timestamp) as first_login_ts, |count(distinct sessionid)*100 as bind_score |from logdf |group by deviceid,account | |""".stripMargin)
loginCnts.createTempView("today")
println("当天评分结果")
loginCnts.show(100)
// 加载 T-1的 绑定得分 (从hive的绑定评分表中加载)
// val bindScorePre = spark.read.parquet("dataware/data/idbind/output/day01")
val bindScorePre = spark.read.table("dwd.id_account_bind").where("dt='2020-10-06'")
println("历史评分结果")
bindScorePre.show(100)
bindScorePre.createTempView("yestoday")
// 全外关联两个绑定得分表
// 并将结果写入hive表的当天分区(T-1日分区就无用了)
val combined = spark.sql(
""" | |insert into table dwd.id_account_bind partition(dt='2020-10-07') | |select |if(today.deviceid is null,yestoday.deviceid,today.deviceid) as deviceid, |if(today.account is null,yestoday.account,today.account) as account, |if(yestoday.first_login_ts is not null,yestoday.first_login_ts,today.first_login_ts) as first_login_ts, |-- if(today.account is null,yestoday.login_cnt,today.login_cnt+yestoday.login_cnt) as login_cnt, |if(today.account is null,yestoday.bind_score*0.9,today.bind_score+if(yestoday.bind_score is null,0,yestoday.bind_score)) as bind_score |from | today |full join | yestoday |on today.deviceid=yestoday.deviceid and today.account=yestoday.account | |""".stripMargin)
spark.close()
}
}
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/153170.html原文链接:https://javaforall.cn