00:00
使用Python API定义好了整个的组合模式,模式序列之后,那接下来我们就可以把当前的模式应用到数据流上,然后去检测复杂事件,然后进行处理了,所以接下来的下一步就是要做模式的检测处理啊,那接下来对应在我们之前代码里边,其实就应该是第三步,首先要将模式应用到数据流上,检测复杂事件,然后呢,再将检测到的复杂事件提取出来进行处理转换啊,那这这两部往往都是结合在一起去使用的。首先我们来看第三步,就是把模式应用到数据上这个过程,这个过程其实非常简单,直接调用CP.on方法得到的是一个pattern啊,那这种调用呢,里边传入的是两个参数,一个是当前的data stream,另外一个就是我们定义好的模式pattern。
01:02
这里需要注意的一点就是。当前的data stream是可以指定key的啊,那这样的话,我们传入的相当于是一个k stream这样的话在后边我们要进行复杂事件检测和处理的过程当中呢,也都只针对当前的K有效,相当于哎,我们经过KY之后,后续所做的所有操作就类似于process方式这样的一个操作了。另外还有一点呢。就是当前模式当中定义的复杂事件,我们知道它的发生是有先后顺序的,那这里的先后判断依赖于什么呢?主要当然就是依赖于对应的时间戳了,诶那这里面关键还有一点就是我们得看到底是事件时间语义还是处理时间语义,那默认情况下,我们这里采用的是事件时间语义,看的就是数据的时间戳,那如果是处理时间语义的话,那其实所谓的先后就是数据到达的顺序。
02:07
那假如说当前时间戳如果相同的话,那对应的这个事件又是谁先谁后呢?诶这里我们可以。我们可以看到啊,当前这个Python方法。可以有第三个参数传入,它有这样的一一种调用方式啊,重写的方式,那传入的是一个competitor,也就是传入一个比较器,这样的话我们就可以定义一个更精确的排序规则,假如出现时间说相同的话,接下来我们判断到底谁先谁后啊,因为我们本身在定义这个复杂事件的时候,当前的模式定义的时候就是有先后顺序的嘛,诶,所以这个顺序还是非常重要的。这是关于第三步,就是把模式应用到数据流上,得到一个检测复杂事件。
03:02
这一步做完了之后,接下来我们就可以处理检测到的匹配时间了,那我们其实知道啊,对于stream呢,可以调一些转换的方法,对于匹配到的复杂事件进行检测和处理,我们之前已经介绍过的最简单的这种方式就是直接调用select这样一个方法,传入的呢是一个pattern select方式里边必须要实现一个select方法,这个select方法其实就是。把检测到的所有。复杂事件里边的对应的简单事件都保存在一个map里边,以它作为参数,然后我们可以从中提取对应的每一个匹配到的事件,然后进行组合、处理、转换,最终得到一个我们想要的输出数据进行输出。进行返回,这里的返回值类型就是输出的类型,这就是select的用法,那除了这个select方法之外,我们其实会发现。
04:06
当前在pattern stream里面还有Fla select的方法。这里我们可以看到select和flat select,它们都有各种各样的参数重载的方式,那我们可以看到最简单的这种方式呢,Select里边传入的是一个pattern select方式啊跟。Select调用的时候传入patternon select function看起来是非常类似的,那这里面的区别在哪里呢?啊,区别当然也会非常的明显,我们这里可以看到pattern select function同样也是有两个泛型,一个in,一个out,那这里边呢,要实现的是一个flat select方法啊那。对应的跟之前pattern select function里边要实现的select方法做一个对比的话,那我们会发现同样都有一个map类型的参数,这里边要存放我们当前检测到的所有的事件,匹配到的事件,那另外呢,它还有第二个参数。
05:08
一个collector叫做out的collector。另外我们发现它没有返回值类型。所以自然就想到了这个flat slat跟之前flat map就非常像了,因为我们当前是一个扁平化的输出,所以可以多次调用al.collect方法输出多个数据,那所以当前就是一个一对多的关系嘛,如果说我们当前并不只是输出一次的话,诶,那当前当然就是可以更灵活的输出零次或者一次或者多次的话,当然使用flat select的这样一个方法会更加的合适。它比select会更加灵活一些。那除了这个之外,我们在当前pattern stream的方法调用里边,还会看到一个叫做process的方法啊,那很明显process,这就有点像process方式啊,底层API的这种调用了。
06:09
同样,它里边传入的是一个pattern方式。那我们看起来这样一个定义的话,就跟之前处理函数家族的那种定义非常的像,我们看它底层又是什么呢?呃,我们看到本身它也继承自abstract。Function啊,它也是一个函数,当然在这里边我们就可以获取到运行上下文,可以去注册对应的状态啊,使用状态,那另外呢,这里我们看到它里边必须要实现的是一个process match方法,同样我们也能获取到当前的一个map类型的参数,这个参数里边。包含了当前已经匹配到的所有的事件,那另外呢,也有一个。Collect类型的alt参数,我们知道它跟Fla select类似啊,没有返回值类型,可以灵活的输出零个一个或者多个对应的输出数据啊,那所以调用out点的方法,这时它跟Fla select一样的地方,那区别在于它还多了一个参数context,一个上下文。
07:18
啊,那这里我们就又看到了啊,当前的这个上下文它特点是什么呢?同样它继承time contact,那我们知道这里边我们可以使用当前的时间访问当前的时间戳啊,那或者是访问当前的processing处理时间,可以问到时间相关的信息,另外当前的上下文还有一个方法叫output。那output很明显,这里边可以传入一个。测输出流的标签啊,那另外可以把当前的数据输出到侧输出流里面去,所以我们会发现当前的这一个pattern process function呢,尽管它不是严格意义上的处理函数,这里我们不能去注册定时器,但是它也有处理函数的一些特征,首先它也是一个函数,可以使用状态编程,那另外呢,它有一个上下文,可以去获获取访问到当前的一些时间属性,另外还可以把数据输出到测数处里。
08:21
啊,那所以这就是当前我们对于匹配到的事件进行处理转换的一个具体的过程,那接下来我们可以在代码里边做一个具体的实现。我们直接就把之前。连续三次登陆失败的检测,我们用更加简单的方式啊,之前我们已经提到可以直接使用一个循环模式,T3就可以直接处理了,当然后面还要追加一个当前近邻关系的指定啊,那我们用这种方式再重新做一遍,然后呢,我们可以统一使用一个当前的process。
09:03
Python process方式来去做一个具体的实现,我们看看怎么样去实现这个功能,算是一个改进吧,所以我们这里边可以写一个。Logging field。Detect example。呃,我们可以把这个加一个pro。改进之后的,那当然了,基本的。代码结构跟之前还是非常的相似的,我们干脆可以把整个这个结构都copy过来。都在没方法里边。完整的先copy过来,然后看一看哪些地方可以进行改进,首先我们能想到的就是这里模式的定义,这里边我们定义模式就不再使用NEXNEX这样的方式去定义了,我们可以直接给一个当前的。Times。三然后后面加上一个consecutive,表示严格禁礼啊,严格禁邻的三次登陆失败。
10:08
指定。是严格。近邻的三次。登录失败。然后接下来我们进行当前的这个检测的时候啊,首先我们还是先把这个模式应用到数据流上面,检测到对应的复杂事件,接下来呢,我们进行处理的时候,就不要再用之前的这个select了。我们可以直接来一个。Process。Process里边要传入的就变成了一个。Pattern process方式。啊,那这里边我们会发现当前的这个patternon process必须要实现一个process match方法,这里管参数很多,但其实也没什么关系啊,那我们这里边关键用到的就是这里这个map嘛,啊,这个map里边它的用法还是完全一样,只不过呢,我们现在我们现在只有一个叫做first的这样的一个。
11:10
个个体模式,那这个个体模式里边呢,因为它有量词,它是一个循环模式,所以应该直接这个个体模式可以检测到三个匹配的事件啊,那当然了,这里面我们就不需要把它叫做first了,我们直接叫啊。那然后这三个事件怎么样去提取呢?按照之前我们定义的那种方式是直接map里边去get first get second get,现在我们就只有一个feel,那怎么办呢?诶,那很明显这三个事件都应该放在这个list里。所以我们直接获取这个list的第一个元素,第二个元素,第三个元素,当然得到的就是我们的第一、第二、第三次登陆失败事件。所以接下来我们可以对应的。提取。三次登陆失败事件。
12:03
接下来我们首先得到。First film event。那调用的过程当然就是match,直接去get当前的这样一个K,然后GET0。第一个元素当然就是第一次登陆失败,然后接下来。我们定义。Second film以及。Third film啊,那对应的我们不是更改这里get map里边的K,而是更改后边list里边的索引值一和二,拿出来就是第二次登录失败和第三次登陆失败。然后接下来,呃,当然对应啊,最后我们可以输出一个string类型的报警信息,之前在这个patternon select方式里边,我们是直接return了,现在呢,诶不能直接return,但是我们可以。
13:04
使用alt.collect。去做一个输出啊,那所以我们可以直接把之前拼接好的这样的一个字符串报警信息直接跟在后边作为输出结果输出就可以了。啊,那对应的后边的这个打印也都是完全一样,我们可以再来检测一下,看一看当前得到的结果跟之前是否一致。我们可以看到还是完全一致的,USER1连续三次登陆失败,两秒、三秒、五秒,而USER2没有检测到匹配出来的事件就是我们。改进之后的具体的用法,当然了,在这个上下文里边可以做更多的事情,如果我们用到的话,可以使用这个ctx。调用它里边的time stamp也可以去调用它的奥的方法去实现测输出流的输出。
14:02
就是关于在CP里边怎么样去处理匹配到的所有事件的一个过程,我们可以调用各种不同的API。
我来说两句