00:01
我们现在已经了解了整个CP当中Python API的用法啊,那这一部分呢,其实是CP的核心,也是最麻烦最复杂的一部分啊,不过呢,在整个我们进行复杂事件处理的过程当中,Patternon的定义只是第一步啊,就像之前我们在代码当中实现的这样啊,定义完了patternon之后呢,接下来还要把这个模式要应用到事件流上去,检测匹配的复杂事件,然后呢,呃,检测到的事件还要再提取出来去进行对应的处理转换,最终得到一个我们结果的data。啊,那这才是整个复杂事件处理的完整流程,所以接下来呢,我们要考虑的就是针对模式的检测处理啊,那首先第一步我们说啊,将模式应用到流上,这个非常简单,直接调用CP.pattern方法,里边传入的参数呢有两个,一个是当前的输入事件流input stream,另外就是我们定义好的pattern。
01:02
这里需要注意的就是说我们这里传入的这个data stream啊,可以是经过KBY之后的k stream,而且一般我们在处理具体需求的时候呢,往往都是先要做一个KBY,因为都是根据不同的K啊进行分组之后,然后我们才判断它有什么样的连续事件啊,比如说针对这个不同用户,每一个用户他连续做了什么样的操作,就可以检测到他的行为模式啊,所以一般这个KY这一步操作是必须的。另外呢,在源码当中,其实我们也可以发现啊,调用CP.python方法的时候,其实有两种传参方式啊,一种的话就是直接传入两个参数,一个是data stream,另外一个是pattern,那此外呢,还可以传入第三个参数,第三个参数是一个isn eventt compar,也就是说是一个事件的比较器,那它比较的是什么东西呢?哎,其实这就是说想要正确的处理事件的顺序,那很显然我们应该知道到底哪个事件先发生,哪个事件后发生。
02:07
所以我们知道,如果在处理时间语义下的话,那当然就是按照当前这个数据到达的先后顺序就可以了,但是如果要是在事件时间语义下的话,哎,那就应该要按照各自的事件发生的时间,也就是他自己的那个时间戳去进行排列,这样的话才能正确处理乱序数据。另外,如果说时间戳相同的事件,我们又应该按照谁先谁后的标准来进行判断呢?哎,那这个时候我们就可以单独去指定这样一个比较器,告诉link底层啊,告诉CP到底应该是谁在前谁在后啊,那这样的话,最终我们得到的一个结果就是这里的patternent stream啊,那就是这里我们所说的啊,得到这样一个模式流。那这样一个模式流呢,接下来就可以去做各种各样的提取和转换的操作。
03:01
我们在源码当中可以看到这样一个pattern stream模式流,里边可以调用的方法其实主要就是这几个啊,有各种各样不同的传参方式,那我们综合一下看的话,那主要就是有一个process方法,然后呢有一个select方法,另外还有一个select select方法,至于这个in processing time和in even time,这明显是在指定时间语义啊,那真正进行提取复杂事件,然后进行处理计算的只有process select和SELECT3种方法。那我们这里可以看到啊,如果做一个简单的划分的话,那可以分成两类,一种就是类似于基本的转换操作,一个select,一个select select,这两个显然就是基本的提取和转换的过程,那另外还有一个呢,类似于之前我们提到的底层API处理函数process方式,这这种调用方式啊,那就是点process这样一个方法。所以接下来我们就分别来看一看他们到底是怎么样去使用的。
04:01
其实在这里我们可以看得很明显啊,这里CP里边pattern stream对应的这种处理方式呢,跟之前我们所说的data stream里边的调用方式非常的类似,它里边传的参数也是一个函数类啊,比如说这里的select方法要传入的就是一个pattern select方式。那如果是flat select的方法呢?传入的就是一个pattern Fla select方式啊,那如果是process,要传入的就是一个pattern process方式啊,所以对应的都是相应名称的一个函数类,那然后我们看一下最简单的select方法啊,里边传一个select方式,它到底是个什么东西,我们可以点到源码里面去看一下,这本身是一个Java接口,里边有一个唯一的抽象方法,就叫做select。然后诶,这个我们也已经知道了啊,之前在代码里边会看到本身我们在使用它的时候呢,呃,那里边传入的唯一的一个参数叫做map,它是一个map类型,那里边呢,是以k value的形式保存了我们当前检测到的匹配到的复杂事件,那每一个复杂事件,这里的K针对的就是我们之前在定义模式的时候指定的个体模式它的名称,而对应的这个值呢,因为存在循环模式啊,所以我们这里边是以一个list,用一个列表来表示检测到的每一个个体模式对应的事件的,有可能是一组事件。
05:32
那当前这个select方法有一个唯一的返回值,那就表示我们把检测到的这一组复杂事件到底要经过转换处理之后得到一个什么样的东西输出到下游里面去。那经过处理转换之后,当然得到的就重新回到了data,哎,那所以当前我们在进行处理转换的过程当中,啊,这个select就有点儿像一个map转换,只不过我们一般情况的map呢,它是一个。
06:05
一对一的这样一个转换,一个输入就对应着一个输出,而我们现在呢,哎,那相当于有点像是一组输入啊,各种不同的事件打包在一起,组成了一个大的输入事件,然后把这一组大的输入事件我们放在一个map里边。然后接下来做一个类似于web转换,得到一个输出,最终生成了一个新的data,啊,这就是在CP当中对于匹配事件选择提取的过程。当然了,在这里除了直接调用select方法,我们还可以去调用另外两种,一个是select select方法啊,那对应的这个方法,我们顾名思义啊,它所要实现的这个pattern Fla select function式里边,哎,那对应的一个方法就是Fla select,然后呢,我们看到它并没有返回值里边传入的参数呢,那就除了一个map类型,当前捕获到的所有匹配到的复杂事件之外,另外还有一个collect,所以这个Fla select跟之前s select的区别,这不就相当于flat map和map的区别吗?诶,那就是我们可以以检测到的所有的匹配事件来进行结果的包装输出,那输出的呢,不一定是一条数据,可以输出多条数据。
07:23
那只要调用这里collector对应的那个collect的方法就可以了啊,所以整体来讲跟我们前面讲到的那个data stream vpi里面的调用是完全一致啊,哎,那同样另外还有一个process process里边要实现的传入的是一个pattern process方式,哎,那我们点进去会发现这里边同样要实现的是一个process match方法,哎,那它的参数呢就会更多一点,同样它也没有返回值,跟前面我们讲到的Fla select是一样的,所以它的返回呢,也要通过一个collect去调用它的collect方法去进行收集,可以输出多条返回数据,那除此之外它还多了一个参数,那就是。
08:06
一个上下文啊,这跟我们处理函数里边是一样的啊,哎,那所以这里边这个上下文能够干什么呢?我们看到它又继承自time context,这个time context可以捕获到当前跟时间相关的一些信息啊,那当然了,这个上下文是不能去注册定时器的啊,呃,不是我们处理函数那么底层的一个接口,但是呢,它可以做的事情就更多,那除了这个时间相关信息之外,另外它还能调用一个凹凸的方法,那这是用来干什么的呢?这跟之前我们处理函数底层的那个调用是一样的,它可以用来做测输出流,也就是可以进行分流操作了。Process啊,对应的这个patternth process function,这是flink1.8之后引入的一个方法,引入的一个接口,那整体来看的话,它比select和select select就会更加的通用,所以一般情况呢,我们直接用这个process就可以了啊。
09:04
这也是目前官方比较推荐的用法啊,那有了这些知识之后,接下来我们其实就可以考虑怎么样把之前的这个login field detect代码做一个改进了,因为之前我们说如果说啊,不停的在后边next next这样去定义的话,这个扩展性其实很差的,哎,那有可能我们这个有100次连续登录失败的话,那这个代码量就非常大了啊,最好的方式当然还是使用循环模式去做一个定义啊那。定义了之后,后面我们怎么样去检测它呢?那接下来我们就另外实现一段代码,来对这段代码做一个改进啊,那对应的这个名称,我们还是创建一个scla的object,就叫做logging field detect。Pro一个改进版本的代码,那整体的处理流程呢,我们可以直接从这边先copy过来,基本上是一致的。
10:01
哎,那同样上面这个流处理里边啊,我们这个流式执行环境对应包的这个下划线先要引入,然后接下来首先我们要定义这个pattern,这里就可以做一个更改了。整体来讲就是做一个精简。那我们这里也不需要说这个是first few了,我们直接就把它叫做few吧。We are even type等于few,接下来后边跟上一个。TIMES3,另外后边还要跟上一个,因为我们现在是严格禁邻,哎,所以必须要连在一起,紧挨着那跟上一个consecut。接下来我们把定义好的pattern应用在事件流上,那肯定还是一样啊,做K外之后把pattern传进来,接下来呢,就是要看定义这个处理的规则了,诶,我们之前用的是select,现在我们干脆直接用一个更加通用化的process吧。里边要传入的就是一个pattern process方式,那当然了,这里也需要有对应的这个泛型,我们看到这里的泛型跟pattern select方式是一样的,就是一个input,一个output啊,输入输出的数据类型,所以这里面我们的input是。
11:10
Logging event输出,那就还是string包装成字符串打印输出里边必须要实现的一个方法是process match方法。当然里边具体的这个提取每一个登录失败的事件,然后包装我们最后要输出的报警信息,这个过程看起来跟之前也差不多啊,所以这一部分也不需要删掉。我们直接把这个先。Copy过来,然后看一看到底要做怎么样的一个更改,因为我们现在定义的这个模式已经不一样了,所以接下来去提取检测到的复杂事件的时候肯定也有所不同,因为首先这里就没有first few second few啊,诶,那我们这里只有一个,那就是few。那这里我们会发现,如果说这里面直接去get f之后拿到的还是只有一个元素的例子吗?啊,现在就不是了,因为现在我们是循环模式,所以理论上来讲,匹配到的三个连续事件都会放在当前的这个例子里面。
12:09
所以我们根据feel这一个K去做了get之后,拿到的是一个具有三个元素的列表,诶,那接下来我们GET0,当然第一个元素就是first few,哎,那同样道理。GET1,拿到的就是第二次的登录失败second啊,同样道理,还是同一个K去获取复杂事件列表,然后GET2。获取第三个元素,那么得到的就是第三次登陆失败事件。啊,那提取出对应的事件来之后,接下来我们就可以包装信息,然后做一个报警信息的输出了,那输出的时候呢,诶注意就不是直接返回了,那是要做一个哎,Collect方法的调用,我们这里用于输出的这个参数啊,Collect就叫做collect调用它的collect方法。然后把包装好的报警信息直接放在里边就可以了,这就是我们完整的一个调用过程,好,接下来我们可以运行一下,看一看是不是得到的结果跟之前还是一样。
13:16
好,我们可以看到啊,检测得到结果果然还是USER1能够检测到连续三次的登录失败,而USER2因为中间有success,所以就没有输出报警信息啊,那我们就利用循环模式和严格近邻的限定条件,把对应的模式做了一个更改,然后对应着呢,我们在提取检测到的匹配事件的时候,方法也会有所不同,哎,那这就是关于CP当中检测和处理匹配事件的过程。
我来说两句