00:00
哎,我们就把讲过的这个CP相关的理论用在实践当中,我们对之前的这个登录失败检测做一个CP的实现,好,我们还是在这个包下边去new一个object啊,当前我们这个叫做logging,呃,Fill with cep,好,先把它创建出来。然后这里边需要的一些东西,其实该有的都有了,对吧,而且这里边大家注意一下啊,现在既然要用CP了,一定得保证我们这里边已经引入了flink cp scale,对吧?得把相关的这些依赖先引入,这里边已经有了,所以我们直接写就好了。这里其实整体流程是不是应该非常类似啊,大家想一下,就是前面至少应该是差不多的对吧,所以。我就直接从这里边先把前面的这个环境创建出来,然后读取事件数据,然后一直到大家看这个,把它包装成logging event,然后分配分配这个时间戳到这儿都没问题,对不对?好,甚至KY是不是也一样啊,我们即使是这个事件匹配是不是也是要基于每一个用户来做的呀,所以后边这个甚至可以移植到这个KY啊,这里边先把它。
01:24
全copyping过来做一个实现,我们把这个先改过来啊,引入对应的那个影视转换,后边还需要影什么?哦,这里有这个bonded out aboutness。好,该引的都引入,然后接下来是不是应该去做一个K啊,呃,我们是针对当前当前的user ID去做了一个批改,好,这是读取事件数据。呃,创建。
02:03
事件流简单事件流对不对。好,我们把这个叫做第一步,然后接下来大家会想到第二步我们要做什么。第二步是不是就得去定义那个对应的模式了,对吧?因为当时我们说有了data STEM,有了事件流,然后再有了模式,后边是不是就可以去把模式应用到事件流上,然后接下来就可以得到一个pattern stream,从pattern stream上再去应用一个select的方式,就可以把对应的东西选出来了,对吧?其实就这么几步,四步吧,大概,所以这里边接下来去定义匹配模式。呃,所以login fair pattern这个模式需要去调用cep里边,大家看cep sc里边的pattern对吧。
03:02
这里边是不是首先就应该是一个begin开始啊,演播室序列begin开始,这里边我们一般要先把这个就是数据类型先指定清楚啊,就整个过程当中的数据类型,我们是不是都是logging event啊。然后里边。是不是应该得有一个名字啊,这个比方说我们就叫begin吧,或者大家叫start什么都可以啊,Begin这个简单的模式应该有一个什么样的条件呢?是不是必须得是type,得是feel啊对吧,我们就这么一个条件对吧,所以后边直接来一个where。Even type等于。好,这就是我们第一个简单简单事件,第一次登录失败对不对,检测到了,然后接下来我们要连续登录失败,所以要哎接下来的事件,那有两种定义模式,一种是严格警铃,一种是就是宽松的禁铃,对吧,我们现在是严格的还是宽松的,对,现在是严格的,因为你中间一旦有success不就是是不是就不属于我们报警的这种情况了,所以接下来一定是要next。
04:18
这个是严格精灵对吧,这里边我们给一个名字,比方说就叫next吧,或者前面这个叫first,我们后面叫second是不是也可以啊,对吧?所以这是第二个失败的事件,那威尔他有没有条件。它的条件其实也是,是不是type要等于fail啊,啊,所以这个条件都是一样的,那当然如果说我们的要求是说后边要是连续三次登录失败,四次登录失败可以不可以,甚至我们可以直接用什么呀,是不是可以直接用那个times啊,大家还记得有一个times吗?对吧,可以直接用那个循环,然后变成一个这个循环模式,其实也是可以的啊,这里边我们就直接begin connect nes,因为我们这里边可能想要什么呢?是不是得拿出来第一个和第二个事件他们是不同的呀,对吧,其实我们想要拿这个东西好,这里面还需要什么操作吗?
05:17
我们的需求是是不是还得在两秒钟之内,所以还应该有一个时间的限制,Within啊,这里边要传一个time.SECONDS2,所以大家看模式就这么简单。就这样就把这个模式定义好了,数据了,数据有什么问题吗?就是他两个数据可能不是明的啊,对,它可能不是警邻的来是吧?呃,你的意思是说,就是我们设置了事件时间之后,它有可能是乱续来的,对不对,就是说比方说我们前面的这个例子里边有可能出现什么情况,就是这里边四二的时候fail了,比方说四三的时候fail了,比方说是这个四四的时候是success,但是四四在四三之前来了。
06:08
诶,它能不能检测出来,这是我们真正关心的,对吧?哎,没关系,大家先把它放在这儿,等会儿我们测,大家知道它能不能实现了,其实大家想,如果说他这里边要能,就是处理我们的那个事件时间的话,他是不是一定得有这个能力啊。他其实完全可以做到,为什么可以做到呢?因为我们现在的时间,我根本不考虑别的,我只考虑事件时间water是不是就可以了,我只看water mark进展到什么就可以了,我并不管你哎之前的那个状态到底是什么样的,当然了就是这个water mark你得设计的稍微合理一点,对吧,就是基本上能不要出现这种丢丢事件的这种情况对不对,要不然的话,这个可能就会出现一些问题。好,这里边我们已经定义好了模式,接下来第三步做什么?是不是要?
07:02
对,在事件流上应用模式得到一个pattern stream,对吧?哎,所以这里边我们定义一个叫做stream的。那么它要调用的就是CP下边的,我们把这个引入啊。Cep下边有一个pattern方法。然后它里面有两个参数,大家看到一个是一个input的data stream,对吧,另外一个就是我们定义好的pattern,所以。我们定义的那个叫什么来着,Logging event logging event stream,后边的pattern叫logging fair pattern,直接把这个传进去就搞定了,对不对,这一步搞定了,然后接下来稍微的麻烦一点。
08:02
是不是就要。从pattern stream上。应用。Select function。然后是不是要剪出我们想要的那些匹配好的事件序列啊,剪出事件。匹配的事件。序列啊,所以这里边我们定义一个最后剪出来的,那就相当于是logging了,对吧,Logging field data stream,它就应该是用patternon stream.select方法哦,这里边大家看到select的方法里边可以传一个自定义的函数也可以,是不是可以去new一个select function啊啊,这里边我定义自定义一个啊,叫log log in。叫match吧。
09:02
如果匹配的上的,我就直接用这个方法select function去做处理,把想对应的想要的那些数据结构是不是取出来,最后得到这样的一个data stream,最后我们可以把它做一个打印输出log logging fields。当然同样也不能忘记要做一个执行,对吧?Logging feel with cep知好。所以接下来的核心其实就在于。是不是要实现这样一个自定义的select方式啊,啊,所以那这个东西到底又是个什么呢?来,我们把它写出来就简单多了,Logging feel match。他要去实现的接口是。哎,我们讲的叫pattern select,大家看是不是有这个东西啊,是一个叫pattern select function的东西,那么它里边又是什么呢?
10:03
它里边是不是要传一个input和output的一个一个数据结构啊,也就是说经过我们这里边的一个一个选择检出,到底你想把检测到的那些事件,那些序列包装成一个什么样的输出数据结构输出对吧?那你这里面的out就是我们最后打印出来的这个logging field data stream的数据类型,所以这里边输入应该是什么类型?还是那个样例类类型对不对,Logging logging event,那输出其实我们也定义好了,是不是就是warning啊,对吧,这里面这个warning,所以。直接把它实现就可以了。好,接下来。大家看一下这里边必须要复写的一个方法啊,重写的一个方法是什么?是不是就是select方法啊,所以大家看这个select方式里边就实现一个select方法,那这里面它的参数是什么呢。
11:01
是一个map,就是前面提到的,它是把什么保存成了一个map呢?检测到的所有的事件序列是不是就存成了一个map呀?这里边检测到的这个所有的事件序列是什么样的呢?是不是就应该是一个string哎,这里是不是应该begin对应的有一个事件啊,Next是不是也对应有一个事件,所以是不是相当于begin对应的就是一个第一次fail的那个事件?Nes是不是对应的就是第二次feel的那个事件啊,哎,所以就在这里边都把它保存成了这样一个map。那么begin,呃,这里边是一个这个这是一个list,对吧,这不是terrible了list,而如果要是next的话,后边是不是也是一样的一个数据类型,所以这里我们可以直接从里边把它取出来。因为warning里边我们不是要知道第一次和第二次登录失败的时间嘛,啊,所以把它们取出来啊,定义好,从map中按照。
12:06
呃,名称。取出。对应的事件,好,那么首先我们first few,大家知道是不是要从map.get什么呀?是不是要get begin,然后接下来。呃,大家会发现这个这个list是可以get它的某一个位置的,对不对啊,如果我们要是这个不放心的话,本身它也是一个inter类型,是不是可以去调这个迭代器啊,对吧?直接用它的next是不是就可以了,就把我们当前存着的那个值拿出来了,呃,因为你如果直接指定那个呃,数组的那个就是角标的话,有可能会越界对吧,这个就是,然后我们还有一个second,呃,这个不要叫second。或者叫last吧,Last few map.get nes定义好的,同样terrable类型nes的拿出来就可以了。最后我们要把它包装成一个。
13:17
Warning输出warning里边的数据结构一开始是user ID对吧?啊,User ID就简单了,是不是这俩应该ID都一样啊,你随便拿哪个都行,所以first few.user ID,然后是第一次失败的time,第二最后一次失败的太。然后还有一个messagesger,是不是我们这个就直接输入一个logging file就知道怎么回事了,对吧?啊,这就是我们最后的输出的一个一个状态啊,这样的一个结果,哦,上面这里报错是因为我们这里写错了,不是Python pon不能调print对吧?是什么要调print呢?对我们最后得到的field的这个data stream调print,这样把它打印输出,就应该可以看到我们检测出来的那个具体的结果。
14:10
好,这里边给大家跑一下,看一看效果怎么样,看到这里边已经检测输出结果了,它的检测的出来的方式是不是跟我们前面做改进之后的那种状态编程有点类似啊,对吧,它是不是也是检测到,只要检测到就直接会输出,对不对啊,它这个模式也是这样的啊,就只要检测到就会去输出,所以这里边我们的数据是连续的三次登陆失败,而且都在这个两分钟之内,它是只要检测到符合的连续的两次,它就输出一个报警信息。那大家比较关心的其实是什么呢?是这个改成success它还行不行,对吧?啊,当然这个改成success肯定没问题,大家关心的是乱序数据他搞不搞得定对吧?哎,这种情况下他搞不搞得定,我们再来跑一下试试。
15:04
看现在的输出结果是不是还是正常,检测到了四二四三两两次连续登录失败的这种模式啊,中间我们即使是插了一个乱序数据,Success的这个四次是不是对他完全没有影响。对吧?啊,所以大家如果感兴趣的话,可以用其他的数据去反复的去测一测,我们可以在大片的这些登录数据里边检测出,而且是乱序数据里边检测出对应的这个模式,这就是CP的一个基本实现,所以大家看从复杂程度上来讲,是不是我们用CP实现,至少是比前面那个好理解多了呀,对吧,前面你直接写这个模式的时候,这种方式大家看着就好像跟C或者说table API那种方式有点类似了,所以这个就更加好理解一些啊,这就是cep这部分的一个实现。
我来说两句