作者:housecheng 腾讯WXG工程师
|导语 解决金融风控数据监控“开发门槛高”“重复工作多”的痛点,实现PSI计算性能十倍速提升。
背景
在金融业务上,质量和稳定是生命线,我们需要对所有已经上线的风控要素,如策略、模型、标签、特征等构建监控。在过去,我们部署监控的方式为:
这种模式主要的问题在于:
为了解决上述问题,我们设计开发了一套“统一监控计算检查工具”(以下简称监控工具),将监控计算拆解成计算任务生成、监控指标计算、监控指标衍生与检查等模块,实现用户无需额外开发,只需要简单的配置即可完成监控指标的计算和推送,避免了人力浪费,提升效能。
整体设计
系统交互视图
与统一监控计算与检查工具交互的主要有接入方和告警系统,所有的监控由接入方发起,接入方可以是特征、标签、模型、策略的兜底检查同学,也可以是具体业务同学。接入方提出监控需求(填写配置),统一监控计算与检查工具根据需求生成计算任务完成计算,如果触发告警则通过告警系统将告警发送给接入方,接入方接受告警后及时修复并反馈登记,监控工具会读取用户的告警反馈重新完成相关计算,直至监控指标在告警阈值内。
为了完成监控指标的计算,统一监控计算与检查工具可以细分为三个核心模块,分别为:
需要注意的是,我们提出了“监控指标衍生的概念”,将不依赖数据源表只依赖监控指标及其历史记录的一类指标称为“衍生指标”,将衍生指标延迟到检查器Checker上计算,可以节省大量计算资源。具体来看,非衍生指标和衍生指标的不同在于:
除了核心模块,统一监控计算与检查工具还提供了“发出告警指令”、“接受告警反馈重新生成计算任务”等辅助模块。以上共同组成了统一监控计算与检查工具,确保触发的异常告警能够得到及时反馈修正。
部署视图
在实际部署上,统一监控计算与检查工具中TaskMaker(任务生成)、Calculator(计算)、Checker(检查)等模块实际上对应一个Spark节点,各个模块之间依赖关系如下图所示。
计算任务主要由TaskMaker模块根据用户配置生成,此外用户反馈已经修复的告警也会重新生成计算任务,TaskMaker屏蔽了不同调度周期的数据任务生成周期不一致的问题,例如日表任务每天生成前一天的表监控任务,月表任务只在每月特定一天生成月表的监控任务;Calculator接受计算任务完成监控指标的计算,Calculator完成监控的多数计算,需要较多的计算资源;Checker完成监控指标的衍生和检查。
模块详细设计
接下来,我们讨论监控工具TaskMaker(任务生成)、Calculator(计算)、Checker(检查)等模块的设计难点。
计算任务生成(TaskMaker)模块
计算任务生成(TaskMaker)模块核心逻辑是:
TaskMaker的主要设计难点在于:需要处理监控任务调度周期与源表计算任务调度周期的差别,适配好hour/day/week/month等不同周期,可以总结为下表:
假设当前数据时间是20210210 11:00,因为调度系统对日任务通常有一天的偏移,此时实际执行时间为20210211 11:00,对于不同类型的源表周期,偏置和最终生成分区举例如下:
源表日表、月表等不同调度周期的问题在TaskMaker模块解决,后续模块不再感知源表周期的区别,专注完成监控指标的计算。
监控指标计算(Calculator)模块
监控指标计算(Calculator)模块核心逻辑(如下图)是:
首先,我们通过实例来解释如何通过执行优化避免重复计算,提升性能:
为了实现执行优化,我们需要将一个监控指标的计算过程拆解为若干个最小可执行单元,称之为函数。具体到实现上,函数保存了计算逻辑的实现代码,过程调用若干个函数完成监控指标的最终计算,如下:
# Function
def F:RDD_aggre(...): ...
def F:math_psi(...): ...
# Procedure
def P:psi(...):
seg = F:RDD_aggre('cal_seg', ...)
cur = F:RDD_aggre('count@cur', seg, ...)
his = F:RDD_aggre('count@-1', seg, ...)
psi = F:math_psi(cur, seg, ...)
return psi
上述计算过程可以转换成计算图DAG来表示,如下图:
更复杂的,当有多个监控计算过程时,DAG可以表示为:
DAG需要执行的部分为叶子节点,为了避免重复计算,
我们对每次执行的叶子节点进行两类类优化:
例如,上述DAG叶子节点表示的可执行函数(叶子节点)为:F:RDD_aggre(cal_seg,表1,A列)、F:RDD_aggre(cal_seg,表1,A列)、F:RDD_aggre(null_rate,表1,B列),其中两个F:RDD_aggre(cal_seg,表1,A列)为同名同参函数,合并为一个执行,又F:RDD_aggre(cal_seg,表1,A列)与F:RDD_aggre(null_rate,表1,B列)是同名函数,可以合并执行F:RDD_aggre([cal_seg, null_rate],[表1, 表1],[A列, B列]),此时原本需要需要三次遍历表,合并为一次遍历表即可完成。
同样的,在第二层叶子节点函数F:RDD_aggre(count@cur,seg, 表1,A列)可以合并为一次执行,但F:RDD_aggre(count@-1, seg,表1,A列)、F:RDD_aggre(count@-6, seg,表1,A列)需要分别遍历不同的表分区(上一周期分区、前6周期分区),因而只能分别计算,第二层叶子节点共产生三次遍历表,如下:
最终,拉取分段计数,在本地完成PSI的计算:
综上,执行优化算法小结如下:
算法: 执行优化算法。
输入:当前全部未执行计算任务对应计算过程。
流程:
- Step1. 将计算过程转化成DAG表示,每个节点为一个执行函数。
- Step2. 如果当前还存在未执行的叶子节点,那么合并叶子节点中的同名函数,当函数名和参数都完全一致时,合并函数;当函数名一致、参数不一致,生成新的执行函数。
- Step3. 执行函数,如果缓存中存在结果,直接拉取结果,否则完成计算后缓存结果。
- Step4. 若还存在未执行的叶子节点,返回Step2,否则终止。
输出:计算过程对应的监控指标结果。
当前,Calcutor支持常见监控指标包括:
通过Calcutor模块可以完成监控指标的计算,但也存在一些监控指标(如空值占比)需要衍生后才能判断是否异常,因而我们设计了Checker模块。
监控指标衍生与检查(Checker)模块
监控指标衍生与检查(Checker)模块核心逻辑为:
Checker的衍生方法包括:
完成衍生后,需要判断指标是否异常,这是通过检查阈值实现的,常见的检查逻辑有:
举个例子,如果要对缺失率波动进行监控,要求其变化幅度小于0.1,并且变化率小于0.2,可以将指标衍生配置为 diff@-1、relative@-1,指标检查策略配置为 abs_less_than@0.1 、 abs_less_than@0.2 。
监控计算优化实例 - PSI计算从20h到2h
在我们的实践中,发现对6w个数据列的psi等4个监控指标的计算,仅日表监控计算耗时长达20h+ ,计算耗时过大,长时间占用集群资源也会导致线上任务延迟。我们分析了造成计算时间长的原因有:
针对这些问题,我们提出了下述方案逐一解决。
PSI计算优化:从4次遍历表到一次遍历表
相比缺失值占比、零值占比只需一次遍历表,计算psi@-1、psi@-6总共需要4次遍历表,具体如下:
为了降低PSI的遍历次数,我们设计了一种基于直方图的PSI估算方法,通过一次遍历表,得到特征分布直方图,再结合历史上计算的其他周期特征分布直方图,就可以估算出PSI。
如下图所示,基于直方图的PSI估算方法主要包括4个步骤: - 步骤一:遍历一次表,使用蓄水池采样数据(>10w),本地计算分段、统计各个分段计数,得到特征的直方图分布h1,如下图; - 步骤二:从历史结果中拉取-n周期的直方图分布h2; - 步骤三:由于“分割点”不一致,我们无法直接根据直方图计算PSI,因此对直方图进行分割,使得当前周期直方图和上一周期直方图的分割点一致,取h1、h2直方图分割点的并集作为新分割点,按照新的分割点重新划分直方图得到h1`、h2`; - 步骤四:根据分隔后的直方图h1`、h2`和PSI计算公式计算PSI即可。
通过PSI计算优化,计算时间从20h -> 7h。
Pyspark Row属性访问优化
我们发现Pyspark实现的Row访问属性有效率问题(如下图,官方源码注释也承认了这一问题),row['field']需要遍历所有的列名,才能得到正确的下标,其时间复杂度是O(n)。
为了解决row访问速度的问题,我们提出了下述方案:
通过使用了少量的内存存储[列名->列下标]映射,即能将Row属性访问复杂度从O(n) -> O(1),最终实验证明计算时间从7h -> 4h。
超大表的优化:采样与避免序列化
我们观察到,目前存在少量监控表行数达到20亿+,历史原因其格式为format(慢于orcfile),这些表全表遍历计算监控指标的时间达到数个小时。
针对这种超大表,我们提出了采样和避免序列化的优化方法,具体来说:
通过上述优化,对于20亿+行数的大表计算时间从数个小时到几十分钟,并最终实现总体计算时间从20h -> 2h的优化。
小结
针对金融风控要素监控的“开发门槛高”“重复工作多”等问题,本文提出了“统一监控计算与检查工具”这一解决方案,本文详细论述了该方案TaskMaker、 Calculator、 Checker等各个模块的设计实现,最终实现了用户无需额外开发,只需要简单的配置即可完成监控指标的计算和推送,避免了人力浪费,提升效能。最后,我们还给出了一个“监控计算模块”优化的实例,通过“直方图估算PSI”、“Row列名广播”、“采样与避免序列化”等方式,将监控计算的速率提升了10倍,节省了大量计算资源。
近期热文
让我知道你在看