00:00
上节课已经对这个登录失败,连续登录失败的这种异常的行为做了一个检测报警,但是大家回忆一下,发现我们当时的实现其实有一些问题。在做测试的时候,其实我们也发现了,就假如说我们在两秒钟之内,现在的逻辑是判断,如果要是有两个以上的登录失败,我们从把它全存到那个list state里边,如果发现它大于等于两个,我们就认为要报警,对吧?但是在这个过程当中,我们是不是一定要等到两秒钟那个定时器触发的时候才能判断啊?那在中间如果要来了一个成功的一个的话,是不是相当于就会把我们之前即使应该报警了也会把它清空,或者说在中间过程当中,即使是没有这个成功把它清空的这种状态出现,连续都是失败,那是不是大家会想到这个其实效率也比较低啊。本来你前面都已经攒了100个登录失败的这个信息了,然后你非得等到他已经攒到1000个的时候,到两秒钟的时候才去触发一个报警,我们是不是想到按照逻辑的话。
01:09
应该是第二个登录失败来的时候就应该把它触发输出报警了。啊,所以那我们能不能基于这个想法,在之前的基础上做一些调整,做一些改进呢?啊,当然是可以的,那这节课我们就来把这个代码先做一个改改进啊呃,首先大家想一想这个这里边我们如果要是说判断第二个元素,第二个登录失败的这个事件来了之后就去触发操作的话,那相当于我们这个过程是不是就已经不需要。是是不是不需要定时器了呀,哎,那我们这个这里边两秒钟的这个判断,这个时间在哪里有用。是不是应该是来的那个数据来了之后,他应该带着时间戳,对吧?我们是不是直接可以把之前的那个数据也保存下来,直接比对他们时间戳,假如说他们之间的这个时间的差距在两秒以内的话,诶,那是不是就认为应该触发这样一个操作啊,如果说要是在两秒以外,那就算了,对吧,那就相当于是已经超过两秒钟之后了,所以大家想用这种思路,其实就不需要定时器了,哎,所以好像是这个思路更清晰一点,好像还更简单一些,我们尝试一下。
02:33
那首先是不是把这个定时器就直接注掉了,对吧?这部分我就不管了,然后大家会想到我得改这个process element,我把这一部分也先注掉。我就在这个基础上直接再做一个其他的实现,那我们接下来是要是要怎么实现这部分内容呢。其实这要一个一个判断了,Process element现在就是。
03:01
就是来一个数据处理一次判断一次,对不对啊,那我们现在的逻辑就应该是是不是if,首先就得判断。首先是不是还是应该判断当前的type类型到底是费还是success啊,对吧?那大家其实会想到,如果是fail的话,这是我们要去处理的情况。那如果要是success呢?如果要是别的情形呢?那简单是不是就应该直接清空状态完事啊,跟之前的这个这个处理是不是类似啊,所以这个是类似啊,如果是成功。清空状态。所以呃,这里假如我们状态还是保存在这个loving fieldel state里边,那么直接调它的clear方法,当然上面是如果是失败的话。
04:04
嗯,判断失败才去做处理对不对?呃,那这里边我们接下来是不是还得去判断,是判断跟前面的一次失败去做对比啊啊,要做这样一个判断了,判断之前是否有失登录失败事件。呃,那如果要想判断之前是否有登陆失败事件,那我们的之前的内容是不是应该都存在了,都存在这个loading fairel list里了,对吧?所以这里边我们就把这个。先拿出来,那这个拿的时候我们还是用直接用这个迭代器拿吧,定义一个e logging fielduel state它的get方法,然后可以得到它的这个迭代器,对吧?那大家会想到,如果这个迭迭代器里边有东西的话,那是不是就代表之前有过。
05:03
这个呃,登录失败的事件,所以现在的逻辑其实就是if it has next。我们现在是如果已经有。登录失败事件。那么。就。哎,我我我们现在是直接输出吗?连续两次登陆失败了,对吧,直接输出吗?还得比较那个时间是不是符合两秒之内对不对,就比较事件时间。好,所以接下来我们其实是先要把它拿到,对不对,Tnas的话先拿到啊,那么first。就应该等于它的next,然后接下来我们是不是应该去比较,如果。
06:07
如果什么是不是两次时间间隔,如果要是小于两秒钟的话,时间戳小于两秒钟的话,那就应该输出一个报警信息,如果要是大于两秒钟的话,是不是什么事情都不做啊?所以这里边我们的判断是value.even time,这是当前的这个时间对不对?它如果要是对啊,当然我们这里边可以直接减大于什么,或者我可以直接判断它小于是不是前面一次fail的那个even time加二的话。大家看这一项是不是一样啊,对吧,是不是就相当于它减它要小于二,这不是,这是不是就代表是在两秒钟之内。不可能会有什么存到,诶对,我们现在先按照这个顺序的这个流程先写下来,后边大家会考虑到可能还有别的问题对不对,呃,还可能还有一些乱续数据的问题,这个我们后续再再想啊,我们先把这个逻辑实现。
07:13
然后接下来是如果两次间隔小于两秒,那么就输出报警对吧。呃,输出报警怎么输出,我们的主流就是报警信息,所以是out.collect这里边把它包成一个warning,诶,这里边要的那个信息是user ID,然后第一次的失败时间,第二次的失败时间,还有一个字符串,那么user ID,那就是啊,这个简单,当前不是有value吗?value.user ID是不是就是啊,它KY之后的啊,然后first field的。K,对不对啊,当然这里边如果我直接从这里去取的话,拿出来的应该不是毫秒数,是一个以秒为单位的,对吧?啊,这个大家要搞清楚就可以了,然后first,呃,不是first few,是当前value的even time,对,把这两个,这两个都是秒,直接拿出来,另外我输出一条信息,Logging feel在两秒钟之内。
08:27
啊,所以大家看到就是在这个输出的过程当中,其实我们是不是根本就没有管它到底是传进来的这个东西是几啊,所以在这种情况下,其实我们实现的仅仅是判断两次。连续登录失败对吧,如果要是三次的话,逻辑是不是就不应该直接这样输出报警了,如果三次的话,是不是还得就是我们这个next还得再next对不对,然后他们都时间间隔都在两秒钟之内的话,然然后这里边就输出这样报警,大家可以尝试着把这个做一个扩展啊,按照我们前面给传进来的那个最大的那个失败次数做一个扩展,然后呃,大家会想到这里是把这个。
09:16
如果小于两秒钟的话,输出一个报警,那假如说这个大于两秒钟呢,什么都不用做对吧?呃,但是大家注意,不管报警还是不报警,不管是小鱼还是大鱼,现在是不是都有一个新的失败事件来了。我是不是应该把当前的那个事件更新进那个state里边去啊,哎,所以这里边我们的状态呢,现在是一个。List state,那它怎么更新呢?大家可能知道这个list state要更新的话是一个,它有一个update方法,Update方法里边是不是必须得传一个list呀,对吧,就还得把我们要,呃,就是真正要写的这个数据,要把它包成一个list传进去才可以,大家如果觉得这个还有点麻烦的话,我们还可以用另外一种方式,什么样的方式呢?
10:09
是不是我可以先把它直接清空,然后。在ADD呀,这是这是不是比较比较取巧的一种方式,因为我这里边只有一个数据嘛,所以我就把直接把这些全清空,然后把最新的这个数据按进去就完事了。那当然如果要是出现这个,呃,就是多个数据要去保存的时候,那是不是我们必须要把新的那个数据判断,如果对的话,要添加进去对不对,然后符合一定条件的时候,可能再去清库,那个逻辑就比较多了啊这里边我们是更新。最近一次的登陆失败事件啊,这是这是保存在状态里了,对吧?好呃,所以整个的这一部分逻辑,大家看到这里为止,这都是。
11:10
如果已经有了登录失败。已经有那个之前已经有登录失败事件时候的处理逻辑,那假如说没有呢。Else else也比较简单,Else的意思就是说,是不是就代表我们之前没有过登录失败事件,现在是第一次登录失败这种情况啊,那是不是什么逻辑都不用判断,直接把对,直接把状态加进去就可以了,如果是第一次。登录失败,直接添加到状态啊,所以就是logging feel it直接把它ADD进去。
12:02
大家看是不是这样就可以了啊,这是我们在这一个process element里边用状态编程的方法,把它这个各种复杂的逻辑都已经理顺,把它写在这里边了,好,我们先看一看这个输出的结果是什么样吧,这里边正常的这个输出还是在这里面报警有一个warning对吧?呃,所以正常的话我们可以看到它的报警信息。哦,大家可以看到现在输出只输出一条,然后他报的是什么呢。他报的是什么?他是不是直接二和三这两条,它是不是应该是第三这条数据来了之后,马上就已经输出这个报警了呀,他就没管那个四对不对,哎,那后面为什么这个四就就没去输出呢?因为我们是不是一旦报警的时候,直接要把这个东西要清空的呀。要清清空的话,那是不是后面四来了之后,它又变成第一条了,所以后面就不报警了,对不对啊,所以大家看现在的这个实现逻辑是这样的。
13:10
呃,现在这个四四这里是success,那本来就不应该报警,对吧?啊,这个本来就不应该报警,那大家会想到,如果我把这个改成feel呢,改成feel的话。大家可以看到我们改成SE之后,它的这个预测结果,呃,它的这个输出的报警信息的结果是输出了两次,也就是他检测到4243,这是两秒钟之内连续登录失败,4344是不是还是两秒钟之内连续登录失败啊,他就不管,呃你这个到底是这个多长的时间间隔还是什么样,只要出现连续两次登录失败都给我们检测出来了,诶大家看这个符合我们代码里边的逻辑吗?对,其实是符合的,因为我们当时的这个操作是怎么干的呢?对,我们是先清空之后,是不是还是把最近一次的这个添加进去了,也就是说即使这里边小于两秒钟,我们报警后边是不是也是要把最近一次的这个添加到状态里面去啊,也就是说上一次这个四三来的时候发现报警了,清空了,是不是还要把四三再放进去,对,把它当成最新的那一次登录失败再放进去,那四次来的时候是不是发现他俩又符合了啊,所以当然是符合我们这里边定义的逻辑的,所以大家看从逻辑上看是不是比我们之前。
14:37
呃,定义的那种方式,定时器到两秒钟才去触发的这种方式看起来要顺一些,要舒服一些啊,呃,整体上来讲看起来是解决了一些bug,但是这个代码有没有问题呢?有问题,而且问题还很大,首先一个是在过程中我们已经发现了,这里边是不是已经我们相当于是在逻辑里边写死的,就只能是判断连续两次啊,你如果要是连续三次,连续四次,我们这里边的逻辑就不一样了,对不对?
15:09
啊,所以现在这个传入的参数,这个max few times其实是没起到太多作用,如果这里边大家想一想啊,如果要是变成三次的话,我们怎么做。三连续三次登墓失败怎么做?那是不是我首先得判断里边有没有对吧?如果有的话,是不是还得看之前是有几个啊对吧,至少得有两个,然后如果有两个的话,那然后我再判断他们之间的那个时间戳是不是都符合在两秒钟之内,对吧?呃,类似的一个逻辑,那然后如果要方便这个反复应用的话,后面我是不是可以把它全清空,然后再把最近的两次连续登录失败放进去,然后就等最后一次来了就报警,对不对?啊,可以把它变成这样的一个模式,这个其实也是可以做到的,而且大家会想到你要是做的再复杂一点,可以把这个抽象化出来,对吧,这个逻辑就可以用上这个max times了,这是一方面。
16:13
另外还有一方面,大家想到其实在实现的过程当中,有同学已经发现了,这里边我们直接就是说even time要小于这个第一次加二的这个时间,那对有同学说,那你假如说后面我们有乱序数据吗?现在我们的这个数据例子本来里面就有乱序,假如它出现前面那一次失败的时间比较大,后面这次比较小呢。后面这次比较小,那是不是这个就永远它会报警了。即使他们两次登陆失败,隔了很久,隔了一个小时,你也发现,诶,当前的这个时间,后面这一次来的时间是不是比之前的那个要小啊,当然一个小时的乱序时间,这个这个有点夸张啊,就是至少就是比方说隔了十秒钟之后啊,那再来了一个这个乱序的数据,大家发现还是触发了这个报警的,但这不符合我们的逻辑。
17:05
另外还有一个就是说,大家会想到,在这个过程当中,如果要想去处理乱序数据的话,其实就麻烦很多了。为什么呢?因为乱序数据又涉及到他俩之间,是不是我们处理的时候是先来了一个又来了一个,哎,我们知道这是连续登录失败了,那实际是不是并不一定他俩挨着啊,他俩中间是不是还有可能会插入一个success呀?那你这种情况要怎么处理呢?大家就发现这个是不是就很难办啊,哎呀,这这个其实就很难办了,那我们有没有思路,其实也有思路,我们怎么去处理这样的事情呢。我们是不是还得把既然有乱序数据,是不是还得把watermark那个集市要利用起来啊,就是我们这里边触发它那个操作的时候,不要用这里边比方说它直接加二这个判断,就是触发我们最后输出报警的这个操作啊,而是要根据什么来触发呢?可能我们要用写一个。
18:12
还是用一个定时器,这个定时器是要按照walmark做延时之后的一个时间去出发,对不对,那这个时候可能之前该来的数据就都来了,那如果我这个时候触发的时候判断他之前所有的数据都已经是按照顺序来的话,那我就可以判断他是不是不是真正的做到了两秒钟之内连续登录失败。这样的话,就可以做一些更加复杂的判断。那这个过程大家只要这么一想,就觉得已经很复杂对不对,呃,那那更不要说,我们这里边还仅仅是两连续两次。你要是连续三次的乱序数据呢?连续四次呢,连续十次呢,哇,这个真是要疯了啊,这个如果真的我们自己去控制这么多状态,这么多逻辑的话,简直就无法实现,这个要爆炸了,呃,那。
19:08
大家想一想,我们怎么去处理这部分内容呢?很幸运啊,就是在这个flink里边给我们提供了比较高级的一整套这个库,或者说处理的方法,这就是我们接下来要给大家说的所谓的CP,对这一部分就是专门处理这种复杂事件这样的一些模式,匹配相关的一些问题的。
我来说两句