00:00
然后接下来我们要做的操作呢,第三步,那就是将模式。也就是pattern。应用到事件流上。检测匹配的复杂事件。诶,那所以这里边我们应用的时候到底是调用什么方法去检测呢?这里边cep我们看啊,本身有这样的一个类,我们看就是flink CEp.scla下边有一个CP啊,那我们同样使用它的半生对象调用它的一个pattern方法。哎,这样的话就可以传入我们当前的input的数据流,以及当前的一个pattern,我们看一下它的这个传参啊。传入的就是两个参数,一个是当前的输入事件流data stream,另外一个呢,就是我们前面定义好的pattern,把当前的模式应用到数据流上啊,那所以接下来我们首先第一个就是数据流login event stream,这里需要注意的是我们检测这个三次登录失败事件的时候,这里边呢,并没有涉及到对于这个用户的判断,诶,所以我们只管它这个事件到底是成功还是失败,那接下来我们检测的时候是所有用户的登录失败事件都混在一起去检测吗?诶,那当然不是这样的啊,我们是要依据每一个用户他们自己的行为去检测是不是连续三次登录失败,哎,所以这个时候我们应该把这个数据要做一个按键分组,当前的键当然就是user ID了,诶,那所以这里边我们首先啊,还是应该把它做一个分组。
01:38
K,然后里边选择的这个键,当然就是userz ID,这里边直接传入一个k stream的话,哎,那我们知道k stream本来也是data stream嘛,所以放在这儿这个类型是没有问题的,如果这里边做了K败的话,后边去进行CP啊,复杂事件检测处理的时候,那就是我们之前在文档里面看到的这个样子。
02:00
就是基于每一个键分别的去进行检测匹配啊,那这样的话就完全没有问题啊,后边当然就是把我们定义好的pattern传进去就可以了。那这个得到的东西是个什么东西呢?我们可以看一眼啊。这个类型是一个pattern stream啊,这又是一个特殊的一个类型啊,啊,这是一个所谓的模式流啊,那所以这是一个特殊的经过转换之后的数据流类型,哎,那我们把它叫做pattern。Stream。那最后我们肯定要得到的不是这样一个东西啊,要得到的肯定是还是一个data stream吧,所以怎么样能得到转换之后的data stream呢?哎,这就像我们之前那个做窗口操作一样啊,首先先定义一个窗口分配器,点window,哎,那得到的是一个window stream,接下来呢,当然就是针对这个window stream去定义窗口函数,那就经过处理转换之后又得到了data stream,现在也类似我们基于这样一个pattern。就可以去定义处理规则、处理转换的规则。将检测。
03:07
到的。匹配事件报警输出。啊,我们现在不是检测到了连续三次的登录失败吗?那就把它们提取出来,包装成一个报警信息,直接输出就可以了,所以接下来我们就是基于这个pattern stream,哎,能调用什么方法呢?我们看到它能调的就是select或者select select啊,那或者process对应的这些方法,所以他能进行的这个处理转换呢?呃,不像之前啊,我们的data vpi能够做各种各样的转换,Map filter Fla map啊,Process,那现在它就是一个select。那对应的这个看起来就像拣选做一个选择提取这样的一个过程了,啊,那最简单的当然就是直接去select了。里边我们看他要传一个什么样的东西呢?哎,这里有不同的传参方式啊,最简单的方式。
04:00
其实就是上面的我们看到啊,这里可以传入一个pattern select的方式。这个看起来像是一个函数类啊,它没有对应的拉姆达表达式匿名函数的写法啊,所以我们可以看一下这个pattern select function到底是个什么东西,哎,那当然这是一个Java接口了,里边呢,必须要实现的是一个select方法,哎,那所以这个抽象方法里边它又是干什么事呢?我们看到它里边有唯一的一个参数是一个map类型。这里的map类型呢,里边的k value是一个string,然后一个诶,我们当前输入类型的list。所以自然就想到了我们当前的所有这个事件不就是硬类型吗?所以哎,我们既然是检测到一组匹配的复杂事件,那肯定就是要把它放在我们当前的这个值里边保存起来的,那前面它的这个key string类型又是什么呢?诶,不要忘记之前我们在定义模式的时候不是有一个name吗?
05:02
就是有一个字符串类型的名称,所以我们要提取对应事件的时候,按照什么去提取呢?就按照这个name去提取。哎,那所以当前我们这里边就是你如果直接去获取这个map里边first few这个键对应的值的话,哎,那得到的就是第一次登录失败的事件,那如果是second费的话,当然就是第二次登录失败的事件了啊注意呢,这里边我们的value都是放在一个list里边的,现在的话,我们这里边每一个登陆失败的事件其实就一个值嘛,所以这个例子里边应该就只有唯一的元素。然后最后呢,我们包装起来可以输出一个out类型的数据,我们现在就检测一个报警信息就完了,所以就包装成string类型好了,所以接下来我们在这儿啊,直接用一个。匿名类的形式,把这个做一个实现pattern slept function,它需要传入泛型,这里我们看到啊,它的泛型就类似于my function一样,一个in,一个out,做一个转换我们当前的in。
06:04
那肯定就是原始的数据类型啊,Logging event out的话直接字符串打印输出,所以是string里边必须要实现的是一个select方法。那里边具体的处理逻辑呢?诶,那我们就把所有的三个登录失败事件提取出来,然后接下来我们最后打印一条信息啊,说检测到了连续三次的登录失败,然后呢,可以把这三次登录失败的时间戳做一个打印啊,那这样的话就一目了然,到底是什么时候三次登录失败啊,所以接下来我们首先先获取一下获取。匹配到的。复杂事件。哦,那首先我们就直接定义一个叫first few吧。怎么样去获取呢?诶直接从这个map里面不是保存成一张map了吗?哎,那一个P一个value,所以我们就直接map去做一个get。
07:00
现在get的第一个登录失败的事件,当然就是前面我们定义好的这个first few这个名称,这里定义的这个K是要跟之前模式里边定义的名称一一对应的啊,那就按照这个去进行保存,这个get到之后呢,他拿到的是一个list。注意啊,当前我们这个例子的并不是保存了三个登录失败事件,而是。只保存了我们当前第一个登录失败事件啊,所以这个例子里边就只有一个唯一的值,那这里面要获取它的值的话,直接GET0不就完了吗?啊,就唯一的一个值啊,就第一个值拿出来啊,那同样道理,我们可用可以用同样的方法去获取第二次登录失败和第三次登录失败。那这个叫做second,下面这个叫做third。哎,那同样后边我们需要把。对应的这个名称也都改过来。把这三个事件先都拿到之后。接下来就。
08:02
包装报警信息。输出,那这里的输出啊,也不用专门去写啊,因为直接返回的字符串就是我们的输出结果吧,所以这里面我们就直接做一个字符串拼接好了,呃,那首先我们应该说到底是哪个用户发生了这样的一个连续三次登录失败的行为啊,做了一个报警,所以我们这里边就先把这个当前用户的ID先写出来,哎,那这个ID的话随便找哪个都行,对吧?First few,我们把这个UID拿出来。然后接下来我们可以说它。连续三次登陆失败。啊,那他的登录时间。分别是接下来我们可以直接写。First field time step以及。后边可以跟上second field time。另外还有。Third field time,哎,这样的话就包装好了我们最终的报警信息。
09:05
好,那如果已经得到这个结果的话,我们现在可以看一眼啊得到的。它的类型当然就回头变成了一个string类型的data stream啊,所以这里面我们可以叫做result stream。或者说叫做warning street报警的这条流啊,那最后就可以把它做一个打印输出,看到最终的结果了。那整体如果要运行的话,不要忘记还有。因为直接要执行起来,好,接下来我们可以运行一下,看一看是不是能得到我们预期的结果。好,已经运行起来,我们看到哦,果然我们这里的检测就得到了,USER1连续三次登录失败,登录时间分别是两秒、三秒和五秒的时候,那U2的话,A在这里就没有被检测到,因为它中间插入了一行success。那如果说我们把中间的这一行啊,就是success啊,如果要是改成feel的话,那接下来我们重新运行一下,看看效果会是怎么样的。
10:10
那我们会想到现在UR其实有四次登录失败了,诶,那我们看到它就会检测到连续三次登录失败输出报警信息,而且会输出两次啊,这就是我们测试的结果啊,那当然了,对于我们这个例子而言啊,并没有去考虑当前这个在多长的时间范围内,另外呢,也没有考虑到当前这个时间,假如说啊,出现这个乱序的时候,到底能不能正常处理啊,那我们其实会想到,对于CP而言,只要我们定义了这个事件时间,然后提取了时间戳和水位线,那接下来呢,肯定都是能够正常处理时间相关的内容的。这就是我们使用cep对于连续登录失败事件的一个检测。
我来说两句