00:00
哦,前面我们已经针对这个需求做了改进,大家发现现在这个时效性更强了,那前面这种改进的方法有没有什么缺点的,其实也是有缺点的,首先大家会看到就是在这里边这个实现的过程当中,这里边我们其实之前我们那个二那个就是失败的次数是直接传进去的,对吧?而且我们的那个逻辑其实你直接改一下的话,你改成这个连续登录失败五次十次都可以直接改一个数就完事了,但现在这个可扩展性就差多了,对吧?你像我们这个是通过什么去判断当前那个连续两次登录失败的呢?完全靠这个if else逻辑对吧?那你说如果要是三次登录失败,大家已经可想而知,已经复杂很多了,对吧?三次登录失败,你就得那个判断,那个历史里边至少要有两次嘛,然后如果要是里边有一次只有一个登录失败,那怎么办呢?啊,那你也是在追加一个,但是你追加的时候,假如说他俩的那个时间差已经超过两秒,又怎么办呢?对吧,这又有很多if else的判断,那你说。
01:00
更不要说如果要是五次十次登录失败,这怎么扩展啊啊,这没法整了对吧,就感觉这个复杂度已经太高了,另外还有一个很麻烦的问题,其实之前我们有考虑到,诶,刚才其实有同学已经想到了啊,就我们在这个测试的过程当中,本身这个数据是是带乱序的,对吧,我们刚才是不是没有测这个乱序的情况啊,哎,那这里边比方说呃,我这里边在这个4243之间插一条,我直接给一个当前同样一个userz ID的一个数据啊,然后来一个success登录成功对吧?然后这里比方说我来一个四五。大家想一下,如果我来一个四五的话,我现在沃纳延迟不是呃三秒钟吗?正常来讲应该能应该能把这个处理,呃,能够搞定对吧?四五跟四三的这个乱序只有两秒钟吧,正常应该是能够处理的,但是大家想想之前我们这个逻辑能把它搞定吗?
02:00
这逻辑搞不定对吧,我们现在其实就是来一个数据就直接处理,来一个数据就直接处理,你有判断它这个,呃,就是当前这个时间是否跟那个water mark它的这个关系,我们根本没有去判断对吧?诶所以在这个过程当中,假如我插了这条这样一条数据的话,我再运行一下这个代码,大家可以看一看这个效果,大家能想到插了这个代码之后,就是正常来讲,按我们的要求啊,是不是424344,这还按道理还是连续三次登录失败对吧?哎,正常来讲还应该是,呃,就是有有这个失败的,这个报警的,但是大家看现在呢,现在就是只有4344报警一次,为什么?因为它插到4243之间了呀,我这里边就认为A42之后来了一次success,然后按照我们的逻辑是不是这样的,对吧?在这里边我们的逻辑就是前面一次,呃,Has next之前有一次,然后我判断,呃,当前接下来又来了一个呢,来了一个success,直接就把它清空了。
03:00
所以后边的这个四三再来的时候,相当于已经是一个空的list state了,那当然它就只能是先添加进去,只能再等后面的四四来才能再输出报警了,所以大家看到这样的话,这里边的输出结果就是有问题的啊,那或者有同学可能想了,那之前这个我们原始的这个啊,就把它全放到list set里边去做判断,这个能不能成功输出呢?对于乱序的情况也不能,对吧,我们让一下啊,再看看之前的这个状态能不能对于中间插了一个这个四五的这个success能不能输出报警信息。看到这里边输出的结果相当于还是只能检测到,诶检测到我们这个后边4344这里边的一次报警的这个结果,对吧?啊,因为前面那个四二,然后后边来了这个成功之后,我们是把那个定时器已经删了嘛,对吧,直接删了,所以这个报警是哪里输出的呢?是四三来了之后又注册的那个定时器,对吧,他判断我们定时器已经清掉了嘛,然后再注册一个,最后报警,报警出来这样的一个结果,所以我们这就本来我觉得应该是四二到四四,这是连续的三次登录失败,但是现在根本就检测不到,那怎么样解决这个问题呢?首先有一个想法,可能大家能够想到啊,在这里边我能够怎么干呢?你既然是对这里边如果说有这样的乱序数据的话,那是不是就相当于我数据刚来的时候,还不能直接去做操作,对吧,因为我当前是有乱序的嘛。
04:38
你直接来了一个845这个success的数据,不代表说842和845之间就没有数来了,不代表说我费之后就直接来了一个success对吧?哎,所以这里边我得怎么样延迟得等对吧?哎,那一个想法就是我等到什么时候呢。最简单的一个想法就是我当前等到water mark真的长到我自己的这个时间。
05:06
大家想想是不是这个道理,能能理解这个含义吗?就是我当前845数据来的时候,Watermark是多少?我现在延迟三秒,那应该是对只到842对吧?所以我怎么做呢?就是842它这条数据和之前啊,之前所有的那些数据该做的操作我可以去做了,比方说A842来了一个费,对吧,我这里边的逻辑是直接要把它添加到这个当前的这个list state里边,那你可以做这个操作了啊,就是到这个watermark涨到842的时候,我做这个842数据对应的那些操作,那我当前这个数据新来的数据是845啊,那怎么办呢?等着对吧,现在我给他来一个缓冲区,给它全放在缓冲区里边,一直等着,然后后面又来了843 843怎么样呢?继续等着,为什么?因为现在的water mark没到843对吧?那我并不确定842和843之间会不会再来一个success啊。
06:08
哎,所以这里边我还得等着,怎么等呢?等等到什么时候呢?我等到沃玛,假如说后面我来了一个四六,那是不是涨到843了,我ma到843了,那我就诶843可以从缓冲区里边拿出来去做判断了,走我们这个流程对吧?这843作为我接下来的下一次的这个事件来输入,哎,我判断哦,之前842已经有一次费用,然后接下来,哎,我做这个处理的时候,发现他俩的这个时间差还在两秒之内,直接输出报警,这个是不是就没毛病了,对吧?哎,那你这845什么时候处理呢?等到后面848来的时候,Watermark涨到845了,那是不是相当于之前的数都到齐了,对吧?那所以我就处理它,你之前如果要是说呃,这个已经打断了,之前的那个连续登录失败的话,那你该清空就清空吧,因为肯定不会再来中间插入别的数进来了,哎,所以一个简单的思路是这样,但是大家会想到这个处理方式,这也太复杂了点,对吧。
07:08
感觉这个我们要处理好多事情呀,一个是我们经过这个改进之后,我要处理就是扩展性的问题,假如说五次登录失败,十次登录失败,我得if else做好多判断,好多状态,另外还涉及到你这里面还有一个乱序数据的处理,有没有更简单的方式能帮我们直接搞定这些事情呢?诶,这里边给大家介绍一个,那就是flink里边给我们提供的一个包,或者是一个库,叫做cep哦,那所以说接下来这个cep它处理这种复杂事件的处理,Cep它是一个缩写啊,本身缩写它就是complex的event processing复杂事件处理这样的一个缩写,那它就是处理什么情景呢?就是哎,你先来什么事儿,后来什么事,对吧?就一组事件构成了一个复杂事件,对于这种场景它处理的特别给力啊,那接下来我们就在代码里边先来试一试啊,让大家领略一下CP的风采啊。那首先我们还是既然要用到一个另外的库,那首先我们应该要把对应的对包要引入,那大家看引入的包呢,就是flink cep scalela,然后后边给上scla的版本,下边对应的是flink的版本啊,我们知道现在我们都已经写好了嘛,这应该是一点十点一对吧,上面scla是。
08:33
点一二,所以我把这个依赖先引入到当前的po文件来,呃,Logging fill,我就直接在自己的这个子模块这个目录下边去引入就可以了,对吧,因为别的地方没有用到嘛,Depends,然后把这个dependency引入好,有了这个模块之后,接下来我们在下边啊,继续新建一个scla的object,然后当前这个又是一个改进,我们这个就叫做v cep啊,就是通过cep去实现的这样一个连续登录失败的检测还是没方法先放在这儿,然后这个大家就想到了前面要做的操作是不是跟之前应该是几乎一无一样啊,对吧,基本上没什么区别啊,所以接下来我还是啊,把前面创建环境,然后读取数据map成样例类,分配时间,出watermark,到这里为止全部copy过来。
09:33
先把这个包啊,对应的这些包圈引入啊,然后这里大家注意get class还是不要用这个了啊,另外环境这里改成下划线引入啊,影视转换啊,然后接下来大家看一下,就是现在如果要用CP去做这个操作的话,它的步骤是一个什么样的过程呢?啊,就是首先第一步先定义一个匹配的模式,我们看啊,就是当前所谓的这个CP复杂事件处理,它主要是怎么处理复杂事件的呢?就是自己定义出一组模式事件序列的模式来,然后按照这个模式这个模板对吧去套,然后看来的这个流式的数据,数据流啊是否符合我们这个模板啊,所以接下来我们首先要定义这个模板,定义一个匹配的模式,呃,那要求是一个。
10:33
好,登录失败事件后,紧跟另一个登录失败事件,那接下来大家看看这个这个模式怎么样去定义啊,我定义一个logging feel pattern,诶,那那大家可能会想到这个pattern,这这到底是个什么东西呢?这就要用到cep cep库里边给我们提供的一个类叫做pattern啊,当然我们这里边主要用的是它这个给我们提供的那个办生对象对吧?调用它里边的一个begin方法,然后大家看到它最后要返回的是什么呢?呃,大家看这里边得到的是一个group pattern对吧?呃,所以就是要返回的就是这样的一个模式,这个模式到底是个什么东西呢?后面我们看到它本来一开始必须调一个begin嘛,Begin大家知道这个字面意思开始了,这就是说你当前定义的这个模式,你不是要做做匹。
11:33
配一个事件序列吗?那这个事件序列以什么开始呢?就在这儿对吧?啊,大家看到这个定义的时候呢,其实还可以把这个类型啊,直接把当前数据流里面的这个类型定义在这儿,我们当前就是logging event嘛,然后里边它需要一个参数,这个参数就是当前,大家看这个写进来就不报错了,对吧?这个参数就是当前你这个begin啊,第一个事件我们检测到之后,给它的一个名称,就类似于一个name一样,就当前我们检测到的事件的一个name啊,那比方说我们第一个事件当然就是啊,第一次登录失败对吧,First few,我就叫一个这个名字,然后我把这个写到后边一点啊,大家看的清楚一点,直接点begin,然后那它是不是需要有条件啊,对吧?哎,当前你到底是什么样的事件可以作为这个begin呢?大家看我可以怎么样,可以where where是不是就代表条件?
12:33
就像我们做那个CQ做做查询的时候,你给一个VR条件一样,那这里边大家看它给的是什么,里边传的这个就是一个一个condition返回布尔类型的一个函数,对吧?作为当前筛选的一个条件,这跟filter不是一模一样吗?所以我们这里边直接就是要什么呢?当前的type是不是必须得等于few啊,这不就是第一次登录失败吗?对吧?哎,只要我检测这个数据啊,来了一个even type等于few,那我就把它放在这儿,这叫第一次登录失败,放在这儿,然后接下来还得怎么样呢?还得紧跟着另一次登录失败,对吧?所以这里边我得定一个紧跟着,怎么样定义紧跟着呢?大家看这里,这里有一个follow follow的BY对吧,跟在后边,但这个不是紧跟着,因为follow by大家知道它其实是什么,就跟在后边就完了,对吧?啊,有可能这个中间还隔着别的人,所以紧跟着是什么呢。
13:33
Next next字面不就是下一个对吧?啊,这就是紧挨着排队的时候,就应该是紧挨着的那个状态,然后里边大家看也得传一个string进来,对吧,传一个名称进来,那我们这里当然就是第二次登录失败了,Second feel把它定义到这儿,这是只给了一个名称啊,那到底什么条件呢?是不是我也有条件,还是要给一个even type,必须是fail才可以啊,哎,就是这样做对吧?然后另外我还有一个要求是什么呢?就是前面这两个事件,我检测到匹配到之后,他们有一个时间限制,时间要求必须在两秒钟之内,诶这里边大家看到它的这个模式里边有一个限制条件叫什么with in within in里边给一个time,给一个时间限制,哎,所以大家看这里边就是给的这个time啊,跟我们之前呃,设置的那个window in time.time啊跟那个是一样的,所以我直接给。
14:33
一个SECOND2写在这儿就完事了,哎,这就是一个patternython的定义,大家看,这就是我们说的两秒钟之内连续登录失败两次,对吧?它的这个模式定义的是什么呢?是在两秒钟之内,首先开始是一个登录失败,然后紧跟着又是一个登录失败,它是这么定义的一个模式啊,所以我们接下来,接下来下一步啊,第二步,第二步就是将模式应用到呃,这个数据流上。
15:16
这里边我们得到一个新的数据结构,呃,数据结构得到一个pattern的stream,得到一个这样的数据练习啊,所以接下来我们要得到一个这个pattern stream啊,啊就是叫pattern stream啊,不是这个patternent的stream pattern stream,那这里边我要引入一个要要调用一个方法啊,是CP点,我们先把这个引入大家看,就是flink cp scale,用scale里边的这个版本,对吧?Scalela cp点有一个pattern方法,然后这个pattern方法里面要传什么呢?前面是一个input的data streamam,后边是一个自己定义好的pattern,就是说哎,把我定义好的这个模式应用到这个流上,对吧?开始要做检测了,就这样的一个一个做法啊,那所以这里边我们就直接用之前的这个logging event stream就可以对吧?啊这但是这里大家注意一下。
16:16
我们要检测的时候还有一个条件,之前我们做了一个什么操作呢?是不是还应该要先做一个KPI啊,对吧?哎,应该要做一个分组,所以这里边CP做的时候呢,也可以直接去做分组,所以我可以直接定义这个KBY,按照uz ID去做分组,传一个k stream也是可以的啊,然后接下来后边把我们定义的这个logging feel pattern传进来,这就得到了一个,大家看一下这个pattern方法啊,得到了一个pattern stream啊,然后这个pattern stream又能又能做什么操作呢?点进来之后大家看,大家看一下这个源码里边的方法,大家看它能做什么呢?Select select select对吧?哎,基本上都是select select,另外还有一个process,所以大家会想到接下来它是不是通过某种拣选对吧?Select嘛,挑挑选拣选操作,或者说某种处理操作process,最后就又得到了一个data stream。
17:17
这就是我们说的这个绕一圈,最后还是data stream,大家类比是不是可以想到之前我们讲过的那个Li select呀,分流那个操作对吧?啊,先分流就是只给它先简单的盖一个戳,然后呢,通过select的方法再把它简选出来分开,真正的分成两条流,而我们现在呢,是先把那个模式应用到之前的这个流上面去,相当于先把我们定义好的那些一组一组的事件对吧?匹配好的是不是相当于就先带上戳了对吧?然后接下来是不是再拣选,再把对应的那些数据全拿出来做处理就完事了啊,所以整体的一个处理流程其实就是这样,好呃,那接下来我们就想到了第三步,那是不是就是要检出检出符合呃模式的。
18:17
数据流对吧?啊,然后接下来需要调用调用select的方法,好,那接下来我们得到一个这个叫logging feel warning,呃,Stream啊,然后接下来我们直接基于之前的pattern stream.select然后这个select里边大家看到这里边传的参数啊,就有又有很多种这个重载方式了啊,参数可以重载,然后我们看一下里边最简单的应该是哪一个呢?参数最少的最简单的啊,那应该这这个s select下面还有几种不同的实现方式啊,最简单的其实就是这个,大家看到直接传一个函数就可以了,这个函数是干了一个什么事呢?大家看输入的数据类型是一个map,是一个map数据类型,然后呢,得到的是一个哎,输出转换之后的数据类型,对吧?最终我们得到这个data stream r,所以整体来讲,大家可以认为这个select就是一个比较复杂一点的一个map转换一样,类似于这样的一个一个操作,那这里面的这个map又是个什么东西呢?这就是之前基于我们定义好的这个模式,不是应用到流上了吗?检测出来的那些数据,那大家想到那些数据已经不是一个一个的啊logging event了,对吧?之前我们都是logging event嘛,你现在检测出来这应该是一组一组对不对,这个就不能叫logging event了,那你现在检测出来的时候按照什么去处理呢。
19:56
我们就把检测出来的一组数据放到一个map里边,然后去做处理,然后得到的这个流里边的数据,相当于是什么呢?就是以这个map作为,就是我们检测好的一组数据啊,包装成的这个map作为数据,然后这样的一条流对吧?那这个流里边就是每来一个数据,就是这样的一个map里边就是我们检测好的前后的好几个好几个数据,对吧?然后下下面流是嘛,过一过一会儿可能又来一个数据,那这个数据又是一个map里面又是检测好的匹配好的好几个数据。
20:32
这就是这个处理的一个基本流程啊呃,那当然这里边要做的这个转换操作,那就得自己定义了,我们看到可以做的一个就是可以是直接传这样的一个函数,另外还有一种什么方法呢?哎,大家看到下边还有一个方法啊,就是。我们看到有一个直接可以传递。呃,这个应该是在select。
21:05
哦,这里边大家看到还有这个各种这个颗粒化传参对吧,传多个参数的这种方式,这里边我们还用不到啊,大家看到它可以处理这个timeout,可以处理超时的事件,这个后续我们用到再说,这里边我们主要就是传一个参数,就是传传这个一个,呃,就是一个map作为输入,然后得到一个输出结果就可以了,然后这里边还有一个地方,我需要给大家再找一下啊。大家看到在最上边,其实这一个方法,这个select方法,这里边传的是什么呢?也是只传一个参数,最简单的它传的是一个pattern select方式。那所以这个pattern select方式是干什么的呢?点进去看一下里边一个s select方法,又是传一个map进来,然后返回一个你定义的out类型,是不是就跟我们直接传一个函数一样啊啊所以这个这两种就是最简单的实现方式啊,就要不你实现一个patternon select方式,一个函数类,要不你去自己写一个匿名函数把它实现啊,所以接下来我们给大家写一个还是实现一个自定义的函数类这种方法吧,这种可能大家看的更直直白一些啊,所以我拗一个,呃,比方说这个我叫呃,Logging feel,呃,这个事件event match吧,把它匹配的那个检测到对吧,这样的一个类,最后我们要做的操作,那就是把直接的logging fill warning stream打印输出print出来,最后不要忘记env execute,这里边我们是login fill with cp状。
22:49
然后接下来关键就在于实现,实现自定义的pattern select function,要实现这个东西对吧?好把它定义出来logging feel match,它是pattern select function,诶大家看到它的类型就是我们说的啊,跟那个map很像对吧,就一个音input的一个output,所以这里边本身的这个input是什么呢?
23:25
当前的input是不是就是输入的数据类型啊,对吧?嗯,Logging event login event,然后当前输出的数据类型我们也已经定义好了,Logging feel warning warning对吧?哎,当前已经想要的就是这些啊,它里边必须要实现的就是一个select方法,然后我们看一下这个当前这个select方法到底是怎么回事啊,这里边嗯,大家看输入进来的是一个叫做pattern,这是参数名称啊,它本身呢,是一个Java的map对吧?然后这里边大家看既既然是map,那我就得看它的key和value到底是什么呀,大家看到key是一个string,然后value是一个logging event构成的一个list,这是怎么回事呢?哎,首先logging event大家知道这是我们的输入数据类型嘛,所以前边我们这里边数据啊,本来检测的时候,这这不是定义好了就是loging event吗?那你检测到的每一个就是我们说的。
24:25
这个按照序顺序啊,事件序列里边的每一个单独的事件都应该是一个logy event对吧?哎,这个是没问题的啊,我们直接放在这儿,然后这里边还有一个string string又是什么呢?这就是我们在这个map里边保存它的方法,这就是当前的这个name给的这个事件的名称,也就是说我最后检测出来,比方说啊,连续的有两次登录失败的这个事件,那我保存的时候到这个patternython里边到底怎么保存的呢?就是一个叫做first few对吧,这个K就叫first few,然后里边这个例子里边就保存了当前我检测到的那一个事件对吧,那个logging event,然后同样后边一个呢,就叫做second film,然后后边就是保存了第二次登录失败的那个事件啊,那这里面为什么有一个list子呢?啊,这个后面我们会给大家讲到,就是这里边定义这个检测事件的时候,它不光可以检测一个事件,还可以检测多个。
25:25
啊,所以说这里边给了一个这个例子,现在我们不用考虑那么多,这个例子里边只有一个对吧,拿出来就完事了啊,所以这个其实还是整体来讲还是比较简单的啊呃,那所以接下来我们来拿一下吧,对吧?呃,当前呃,这个匹配到的事件,事件序列就保存在这个map里,对吧?所以我们接下来就一一做一个提提取啊首先我定义这个first few event,哎,那first few event我们当然是从,哎,不是map啊,叫做pattern对吧,Pattern里边直接去get get什么呢?这个K叫做first few对吧,之前我们定义好的那个first,然后注意拿到的呢,是一个list对吧?哎,当然这里边就是本身这个list里边只有一个数,呃,你直接要去拿它的那个就是值。
26:25
直直接去取它里边的这个get get0,这样也是OK的,对吧,直接把它那个就是D0的位置啊,索引下标是零的那个位置直接拿出来是没有问题的,或者你用那个迭代器去拿也是没有问题的,对吧?哎,那这里边我们还定义一个second feel event,那同样用这个pattern get,当前get的就应该得是啊,就是second film对吧?啊那同样这里边比方说我这次用这个迭代器啊,Itator next对吧?呃,只有一个嘛,那直接拿拿出来就是它这都没有问题,那最后我们包装成一个样例类类型,要做一个输出对吧?啊,Loging feel warning做一个输出,哎,那这里边里边我们想要的是当前的user ID user ID,是不是它俩都是一个user ID啊,对吧,这个肯定是没毛病的啊,因为我们是做过key之后的嘛。
27:25
所以随便用一个UID,然后当然就是first few event的时间戳,这是第一次登录失败,Second few event的时间戳是第二次登录失败对吧,最后再给一个当前的一个输出的报警信息,这就是完整的一个处理流程,然后得到的这个数据呢,大家看就是一个logging field warning对吧,只要前面我们按照这个模模式啊检测匹配到的数据,最后都通过这个s select的方法转换成了样例类输出到了我们这个最后最后的这个流里边,最后我们可以把它在控制台打印输出了。
28:06
好的,接下来我们来测试一下,看看这个效果到底怎么样啊。运行一下。好,大家看到现在我们已经输出结果了,大家注意,现在我们的数据是什么呢?是中间,大家看这里边我们中间是插了一条success的数据,对吧?845是插了一条success的,但是你看到他给我们还是检测到了842,呃,就是842843843844连续的两次报警信息,对吧?啊,这两次都检测出来了,因为我们要求的是就是当前是连续两次登录失败检测出来嘛,啊那有同学可能想到,那你如果要去扩展这个连续三次登录失败怎么样去做呢?那这个其实很简单,大家想到我接下来是不是再来一个next,然后现在是third对吧,Third few再去定义一个,然后同样还是where里边这个even type等于feel,这样是不是就完事了?对吧,呃,当然这个三次的话,可能这个两秒钟太小了一点,我大一点啊,给一个五秒钟对吧?啊给一个五秒钟,然后后边这里大家就要注意。
29:14
啊,你你这里边如果去取这个我们当前的这个second field的话,那相当于这是第二次登录失败对吧?你如果想取这个最后一次的话,那应该是取这个third few,然后把这个拿出来,对吧?我可以把这个做一个实现啊,然后这里边是third few event,然后得到这样一个结果,然后接下来大家可以看一下,就是我们当前的这个运行结果是不是还是可以正常的检测到啊,那大家知道现在你如果要是连续三次登录失败的话,那就不可能输出两次了,对吧?诶肯定就是这个42434次,这是连续三次登录失败嘛,中间尽管有一个乱序的这个45SUCCESS进来,不会影响到最后的结果输出。
30:01
我们看一下,诶,大家看到四二到44LOGVING f对吧,按照我们当前的要求,三次登录失登录失败也把它检测到,所以如果用CP的话,它的可扩展性以及对于乱序数据的处理上,直接就包装的很很简单了,大家只要按照它的定义把这个模式能够实现,最后只要把它给一个这个s select方式啊,把它剪剪出来,输出结果就可以达到我们的目的,这就是CP好用的地方。
我来说两句