00:00
接下来我们开始讲下一个拈,从这个拈开始,正式的就就是另外一大类指标了,所谓的风控类指标,前面基本上都属于统计类指标啊,那首先我们要面对的这个需求就是恶意登录监控,那所谓的恶意登录监控主要就是说在很多平台里边,用户账户的这个安全都是大家需要注意考虑,呃,要去重点保护的,那比方说在这个电商平台里边,如果我们发现某一个用户,他的那个账户啊啊,就是在不停的被尝试登录,而且大家知道这个尝试登录他肯定就会失败嘛。他在试密码啊,那不同不停的出现这个登录失败,如果短时间内来了大量登录失败的话,我们其实是有理由相信这是遇到了这个恶意恶意登录的这样的一个攻击,对吧?啊,所以这个时候我们可以把它检测出来,做一个提示报警啊,那同样也是我们的任务,只是检测到这样的行为,输出报警就可以了,至于说后续你是说给用户发通知信息,还是说诶直接就把这个用户的这个账户暂时封掉,对吧,这是考虑到我们具体的业务逻辑了啊,所以我们主要是检测这个行为,那首先大家会想到。
01:12
我们看一下当前具体的这个需求啊,具体需求要求是如果同一用户在,呃,大家注意这里是可以是不同的IP,因为大家想具体在做这个,如果说有人在做这个攻击的时候啊,电脑暴力破解密码的时候,它是有可能切换密,切换当前的这个IP的,对吧?但是这个同一用户user ID,这肯定账户这是唯一的标志对吧?呃,就是如果他当前还是同一个user ID在尝试登录的话,那当然这就是遇到风险了,呃,我们的要求就是同一用户在两秒之内连续两次登录失败,就认为存在这个恶意登录的风险,所以这里边这个判断的次数还比较少啊,所以这也是简化了后边我们判断的一个逻辑啊。呃,那大家想想接下来这个代码怎么样去做呢?
02:02
我们先把在先在这个呃当前的项目里边,把这个新的模块先创建出来啊,先把架子搭好,首先是一个新的module,然后接下来当前这个我们就叫做呃恶意登录的监控,对吧,这个就叫做呃,Log logging file detect。把这个模块先创建出来。首先我们现在暂时还想不到里边这个炮门件要追加什么依赖,对吧?啊,我们就先把它放在这儿,只要有弗link和卡夫卡相关的依赖就够了。然后接下来同样还是在source main下边啊,Java目录下边,我们去新建对应的想要去创建的这个,呃,主体的这个类啊,我就直接把它叫做login feel吧,我带上包名啊,com.at硅谷点。Logging file detect。
03:01
当前的我这个主体代码就叫做login。呃,我把这个main方法主方法先写出来,Throws exception。里边当然了,先把这个执行环境先创建出来。把它叫做env流逝的执行环境啊,同样我们还是把这个并行度啊,先设成一,然后设置当前的时间语义,我先设成事件时间语义,对吧?大家想到当前肯定也是跟事件发生的时间相关的,这是前提。那这里面就涉及到一个问题,我们的数据到底又是呃,什么样的数据呢?还是我们当前没有在user behavior user behavior里边买点日志,看到对应的那个登录信息,所以这里边我们用一个还是自定义的一个测试数据啊来做一个,呃,做一个实现,那这里边主要就是这个这个文件logging log csv。
04:03
把它直接先放到当前的这个resource resources下边来。大家看一下这个本来又是CSV文件,这又是ETL过滤好的逗号分割的这样的形式,我们看一下这里面的字段有哪些逗号分割主要就四个字段对吧?然后大家看到前面这应该是对,大家想肯定是一个用户ID对不对,你既然是登录行为嘛,那应该就是某一个用户,诶在什么时间点,然后大家看后面还有这个IP对不对?诶做了一次登录操作,然后这里边呃,就没有记录这个行为的类型了,大家想这ETL之后是不是所有的都是登陆行为啊,登陆行为大家看是不是就两个结果,到底是成功还是失败对吧?呃,就就这样的两个结果而已,所以后边一个字段就是当前啊事件的类型,最后当然就是时间戳了,所以里边既然有时间戳,这里边我们直接设计的时候,使用这个事件时间完全没有问题,对吧?啊,那当然了,接下来我们还是要在下边去创建一个。
05:06
呃,对应的啊,我们beans有一个这样的包,然后接下来在里边应该要有对应的po类型,对吧?诶首先我们是把这个就当前啊,我们登录的这个事件要有一个要有一个包装,我们就把它叫做logging event。那么在这个里边大家看到对应的第一个,首先当然要的是一个长整型的用户ID了,User ID对吧?然后接下来就是还有一个是IP对吧,第二个大家记得是这个IP啊啊,尽管这里我们想到可能没有什么太大的用,但是也是把它先包装好,既然都已经提取出来了啊IP,然后接下来还有对登录的这个结果,或者说一个登录的状态,对吧。比方说我们管这个叫做logging state一个状态,然后接下来,呃,还有就是time,对吧,长整型的时间戳。
06:11
所以接下来还是把这个构造器无参的先创建好,然后是带参数的。接下来还有要求的get set创建好,最后。再来一个to string方法重写出来,这就是我们一开始想要创建出来的这个po类型啊,然后另外我们还涉及到一个就是你最终不是要报警吗?那检测出来,假如说它真的是这个,呃,两秒之内连续登录失败呃两次以上了,那检测出来这个报警检测出来之后你要报警,报警输出什么东西呢?我们也把它包装成一个输出的po类型,对吧?啊,这里边同样啊,在这个B下边。New一个JA class当前这个就直接叫做logging warning对吧,报警信息啊。
07:06
那大家想这个报警的时候,我们想知道什么呢?当然首先是哪个用户对不对,呃,长整型的user ID把这个列出来,然后接下来,呃,当前这个用户报这个当前登录这个连续登录失败啊报警了,那我可能感兴趣的是在诶哪个时间到哪个时间范围内,对吧?它连续登录失败,呃这个多少次,那所以这里边我其实是可以提取出第一次登录失败的时间,时间戳,然后最后一次登录失败的时间戳,对吧?把这个可以提取出来private长整型的一个时间戳。把这个叫做first few time,下面一个是长整型的last few time。最后我还可以给一个string类型个说明,对吧?呃,就当前的一个是呃,Warning报警的一个具具体的信息,Loading message,好,这就是我们这样一个输出报警结果。
08:09
的一个设置。空餐的放在这儿,然后接下来是带参数的啊。后面还有就是get set啊,这个过程尽管枯燥,但是大家还是要完整的把它写出来,这个时候大家肯定就比较怀念这个SKY里面的写法是吧?啊,SKY里面直接kiss class一行直接搞定了啊好,呃,不管怎么样,我们快速的把这些。需要做的准备工作都搞定,接下来就是看这个代码里边怎么样去实现了,对吧?哎,那首先代码里边我们应该要做的还是先去读取数据。从文件中读取数据,好,那呃,这个过程还是差不多的啊,我们基于当前的这个类。Logging file对吧?呃,直接用这个反射去get当前的这个resource,那当前我们那个文件是叫logging log csv对吧?我直接用这个相对路径直接把它获取到resource拿到,然后接下来当然就是env read text file,这里边我就直接传resource.get pass,直接拿到这个就完事了,对吧?呃,然后这里边我就不单独的再去拆开了啊,后面直接就去做转换就可以了。
09:29
接下来是不是直接map呀?啊,Map需要把这每一行解析出来,同样还是既然是CSCV文件,那就是逗号分割,先得到这样一个呃,字符串数组,对吧?Fields等于当前的line,做一个sweet逗号分割,接下来return一个。你有一个,呃,当前我们是不是想要的就是那个logging包装起来的那个叫做event对吧。
10:04
登录事件我们包装的叫这个这个类型啊呃,然后接下来里边字段,首先是一个长整型的user ID对吧,FIELDS0。然后诶,这个在外边啊这里,然后后边呃,应该是就是接接下来我们的对应的那个数据应该是一个字符串类型的IP和一个字符串类型的登录状态,对吧?哎,所以这两个都是string,这个就直接FIELDS1FS2,最后还有一个长整形的时间戳FIELDS3填进去。这就是我们前面转换成pole类型的一个过程,接下来不要忘记还有a STEM STEM and watermarks对吧?那接下来这个到底给升序的处理还是乱序的处理呢?哎,那就又得看这里边的这个数据了,对吧?我们看一眼这里的数据啊呃,1526,诶大家看3332,这个有乱序对吧?哎,这里边即使是这个一两个数据有乱序,这也是乱序嘛啊,所以当前的这个处理,我们必须要定义一个处理乱序的这样的一个时间啊啊,当然具体的这个时间的话,我们给的是最大乱序程度,这就大概拍脑袋拍脑袋想了是吧。
11:24
Bounded out of orderne,首先这里面提取这个时间的话,Element是提着里边的时间戳,另外大家要注意秒还是毫秒。当前这个应该还是秒对吧,一看就是秒,所以乘以1000,另外上边给一个延迟的时间,大家看这个大概是一个秒级的延迟对吧?啊所以呃,我们可能也不可能hold住所有的数据啊,大概的看一看啊,大家看有一些这个还这个延迟程度挺大,5953对吧?哎,这里边我就直接比方说给一个三秒钟的延迟吧,就大大家就是可以知道,就只要hold住大部分情况就可以,对吧?啊,我们这里边给一个三秒钟延迟。
12:08
好,这里得到的把它叫做,呃,我还是重新给另一个名吧,这个叫logging,就叫event stream。对吧,前面这里大家也可以把这个直接改成data stream啊,Event logging event得到了当前的读取数据的数据类型,然后接下来呢,自然就是要去做这个检测了,大家想这个检测的过程的话。这就涉及到我们这个具体的流程了,怎么样去做检测呢?那这个思路其实大家根据这个两秒之内,呃,连续登录失败两次,大家是不是联想到了,之前我们讲到理论部分的时候,讲process function的时候,有一个十秒之内温度连续上升的检测啊。哎,我们现在是两秒之内连续登录失败,大家一看这个描述是不是感觉就很类似啊,哎,所以之前我们是怎么样去做的,是不是就是第一个啊,发现这个温度值啊,这个上升了之后,第一个数据来了之后,是不是要注册一个定时器啊,然后十秒钟之后触发对吧?然后在这个处理的过程当中,如果连续来的这个温温度值都是上升的话,我保存一个状态,保存上一次的那个温度值,如果接下来都是温度上升,那就什么都不做,等待出这个定时器触发,呃,输出这个报警,那那如果说要是中间温度下降的话,是不是我就把定时器删掉啊啊,接下来就重新开始对吧,清空状态重新开始,那现在我们想到也类似啊,我是不是可以第一个检测到登录失败的时候,我注册一个两秒之后的定时器,对不对,然后哎,接下来就怎么样,我就等这个。
13:48
当前的这个登录失败的数据来对吧,如果说这个登录失败的数据连续来的都是失败的话,我就不管它后面的那个定时器,等它触发,然后输出报警,那如果说要是中间来了上升的话,那是不是,呃,不不是上升哈,那那就是温度了啊,我们现在如果中间来了登录成功的话,那就是是不是删掉定时器重新开始啊,哎,所以这个整体思路我们可以想到是差不多的一种思路,那大家想我们在调用API的时候怎么调。
14:19
是不是直接就是process啊,因为涉及到定时器对不对,涉及到定时器只有process方式能够搞定,那这里边呃,我们就是。自定义处理函数检测连续登录失败事件,那大家可以想一下,就是我们基于前面的这个logging event stream是直接就process吗?后面我们如果要是想要检测这个登录失败事件的话,那大家想我是不是得把之前的那个登录失败要保存下来呀,啊,对吧?啊,而且就是大家会会想到我还不能说是只保存,像之前那个我只保存上一个温度值,因为那个保存上一个温度值,它是只要判断比上一次的温度大,就就保证它是上升的,对吧,那我现在是不是最后还要我还要检测最后这个一共有几次登录失败啊,诶,所以我这里面其实你如果要说保存的话,应该保存的是一个个数,对吧?啊,那我干脆就是简单,简单粗暴一点,我直接保存成一个list,把当前所有登录失败事件都存到一个list里边,最后定时器触发的时候,我是不是判断那个例子里边,诶是不是比你这个定义的两次这个。
15:36
这最大的这个要报警的次数要要大,是不是就可以考虑到底要不要输出报警啊,而且这样的话,我这个还很容易扩展啊,你如果要求是两秒钟之内连续三次登录失败,是不是我就判断它只要大于等于三就可以了,这个实现还是比较简单的,但是大家想这里边我们既然涉及到状态,那这个状态是我们这里所有用户的那个登录事件都要放在一起考虑吗?有成功有失败混在一起考虑吗?是不是要针对当前对单个用户啊,只有当前某一个用户,他自己的这个登录行为,接下来我们才保存成一个一个列表,对吧,才存这个状态,那所以接下来是不是我首先要做一个。
16:20
按照user ID做一个分组对吧?来做一个k buy当前的user ID啊,当然大家也可以直接写成这个方法引用的形式,对吧?刚才我的那个类叫做logging event,直接get user ID啊,做一个分组。当然下边就是直接process了,自定义一个key process function,我把这个叫做呃,Logging file detect warning对吧,检测。报警这样的一个,呃,自定义的kid process function,这就是我们整个一个基本的处理流程了,啊啊,那后边这可以得到一个最终的结果,我把这个叫做warning stream,最后可以把这个warning stream做一个打印输出,最后不要忘记还有env execute执行起来,当前这个叫logging。
17:17
Detect job,这就是我们整体的代码的一个处理流程。
我来说两句