00:00
接下来就给大家把flink里边的cep具体的来做一个讲解啊,那首先我们来说一下到底什么是cep,其实cep它主要就是flink里边的一个,我们说它是一个一个库了啊,就整体来讲,它的层级是比较高的,大家可以认为它对应着就是table API和link CQ这样一个层级啊,就顶层的一个调用,我们看到底层的很多东西都已经包装起来,我们是不需要知道的,呃,不需要做细致的了解,你直接拿来用就可以了,那它的全称其实是什么呢?CP是个缩写,全称就是复杂事件处理,Complex event processing对吧?翻译过来复杂事件处理,那其实具体来讲flink cep就是flink里边实现的用于做复杂事件处理的一个库啊,那那这里面就涉及到一个什么叫做复杂事件处理呢,就是像我们之前提到的啊,假如说我们要检检测这个用户的连续登录失败。
01:00
败的事件,那大家想是不是这就不不简简单单是一个事件来了之后失败,哎,这这就检测到就完事了,而是要来了一个失败,后面又来一个失败,对吧?而且他俩之间可能还需要有关系,比方说我们要求他须紧跟着,中间不能插着有这个成功对吧?啊另外我们要求可能还必须它发生在两秒钟之内,前后这个时间不能超过一定的限度,这些都是我们对它的一些要求,所有的这些要求可以就定义成一一种什么呢?一种事件序列的模式,对吧?啊,就是相当于我们定义一个模式,就是说我们要检测的这个事件,按照这个先后的顺序啊,先来什么,后来什么,在多长时间范围内,然后要求是紧挨着还是可以不挨着,这这就是我们所说的一个可以定义的一个模式啊,事件序列的模式,CP这个复杂事件处理呢,就是允许我们在无休止的事件流里边检测到这样的一些。
02:00
符合模式的事件序列,然后呢,这样就可以让我们有机会掌握这个数据里边感兴趣的啊,最重要的那一部分了,提取出来连续登录失败,或者说其他的一些数据啊,那这里边给大家呃,定义一个东西,就是说一个或多个由简单的事件构成的事件流,就是这个,其实或者说我们叫事件序列啊,它呢就可以定义一定的匹配规则,对吧?啊,就是我们说的那个模式,然后呢,我们基于这个模式进行匹配,就可以得到我们想要的那些符合规则的事件,复杂事件啊,所以我们现在想做的事情就是来了一个常规的事件流,现在我们想提取出,提取出自己定义的符合规则的那些特殊的一组一组的事件,对吧?啊,其实就是这样的一个过程,CP就是主要用来做这件事,然后现接下来我们看一个图吧,还是看一个图啊,大家看一下这里边其实就把这个。
03:00
输入和输出,还有我们所谓定义出来的规则列的就非常的明显了,这里边输入数据啊,这里边分成了两行,我们可以认为这里做了一个KY,做了一个分组,那分组之后,诶,这些数据我们到底是这个数据杂乱无章对吧?大家看这个用不同的形状来表示这个数据里边的一些特点了,比方说这我们当前的数据里边有三角,有圆,呃,有这个长方形,那我现在要什么呢?如果说我要的就是要求我要所有的圆形,那这个这就相当于一个简单的filter嘛,对吧?啊,或者说我要把所有的三角形转换成长方形,那这就是一个简单的map嘛,那这个其实是没有任何的,呃,对于我们这个简单处理来讲,这是没有任何问题的,我们现在要考虑的不是这么简单的问题,那我要考虑的是什么呢?要从这所有的数数据流里边找到圆圈后边跟着长方形的这样的一组特殊的事件,这。
04:00
就叫一个复杂事件对吧?哎,这就叫一个我们定义好的事件序列要满足的一个规则,哎,所以大家看我们的目标其实就是什么呢?要从简单的事件流里边发现一些高阶的特征,那就是前面有什么,后面有什么,诶合成一组,它是不是能得到这样的一个输出,然后应用了这个规则之后,我们的输入当然就是这样的简单事件流了,应用这个规则之后得到的输出应该是什么呢?那就是我要在这个流里边应用这个规则,识别每个事件之间的联系,然后找到符合规则的那些一组一组的复杂事件,对吧?哎,比方说上面这里边,诶,我找到一个先是圆圈,后面是长方形,下边也找到一个,这里边先是圆圈,后边是长方形,对吧?啊,然后这里边我输出的时候,大家看到相当于就把别的都滤掉了,都过滤了对吧?啊,就有点像一个,呃,大家可以理解成一个比较复杂的一个filter一样,就拣选我们想要的特殊的那些数据,然后我们说把它包装成一个大的一个一一个数据整体,然后得到得到的这个输出的数据流里面就变成了这个样子了,对吧,就只有这样的一组一组的数据输出。
05:17
这就是我们这个cep整个做数据处理的过程,然后接下来再给大家具体的把这个模式到底是什么展开来讲一讲,这里边我们看一下这个拍,呃,就是具体来讲定义什么叫做模式呢?就是前面我们说的要处理复杂事件的那个规则,你定义好的规则,哎,前面是三角,后面是长方形,这个就叫模式,就叫做pattern,呃,然后CP里边呢,就给我们提供了一整套API,用来处理这个复杂事件,这套API就叫做patternon API啊,所以当时我们也会发现,经过转换之后,我们先定义一个pattern,然后把pattern应用在当前的那个事件流上之后,得到的就叫做一个pattern stream,对吧,然后基于这个pattern stream再去调其他的一些方法,所以大家看看这个整个的处理流程,主要就是这么三步,第一步就是干什么呢?定义一个pattern对吧?啊,这个pattern在定义的时候就是我们说的规则。
06:18
吧,那我们这里边的复杂事件的规则是什么呢?因为是流失数据,我们就要定义先来什么,后来什么,先来后到对吧?哎,就是一开始是begin,然后呢,你先来什么后来什么,那你肯定要有一个条件定义啊,就是我begin的时候,这个事件首先是给一个名称啊,当前这个叫start啊,比方说这个随便给大家看到,这个名称是随便给的啊,当时我们第一个叫叫做first few啊,第二个叫second few对吧?这里边你看它叫的叫start middle,然后and这个都是相当于就是一个名称的指定,然后后边大家看用where来指定当前的一个条件,对吧?嗯,你第一个事件到底是什么类型的事件,比方说我要求ID是那个42的那个事件放在这儿,然后呢,哎,后边可以定义接下来是什么数,呃,是什么事件,比方说我来一个next,后面这个叫做middle,然后还可以定义什么呢?呃,定义sub type,就是定义接下来的这个数据的子类型到底是什么,对吧?大家看到这个类型还可以专门定义一下。另外还。
07:18
可以定义,哎,我要求的是这是温度值啊,要求当前的温度值大于等于十度,这也是完全可以的,那后边还可以有followed by,哎,Follow by前面给大家说过,就是就跟在后边对吧?啊,就要求没那么严格,然后也可以要求它,接下来后面又跟的是什么呢?Name等于and,哎,这就是我们定义的一个pattern,一个模式,定义好了复杂事件提取的规则,然后第二步做什么操作呢?把定义好的这个pattern应用到我们输入的事件流上,调用cep.pthon方法。把这两个结合起来,得到的就是一个pattern stream,然后大家要注意得到的pattern stream我们并不能直接,呃,就是像像这个data stream一样,直接做转换,做输出,对吧?必须要经过一个什么?呃,相当于一个处理的过程,这有点类似于之前讲过的split select,我们现在是做过这个pattern之后得到的这个pattern string也要做一个select,调用这个select的方法,然后里边要传一个什么呢?Pattern select function对吧?啊,这就是定义我们的处理的具体处理的方法,也就是说前面我们应用之后,已经可以把对应的这个数据应该能够提取出来了,对吧?那现在select是不是就要提取出来啊,把它提取出来,另外还要定义,因为大家想到这是一组一组事件嘛,当时我们说它是保存在一个map数据结构里的,那你现在提取是提取出来了,你最后到底要输出什么呢?一般情况我们不能直接输出这个map。
08:52
所以我要再做一个转换操作patternthon select的方式是不是就是把它提取出来做转换操作的这个过程对吧?哎,你这里边就是想转换成什么类型,你自己定义一个out的数据类型就可以了啊,这就是具体后边这个select的这一步要做的操作啊,然后整体的这个处理流程大家都已经知道了,我们也已经在呃处理这个连续登录失败的代码里边做过实现,那现在我们再来详细的给大家解析一下里边的每一步到底又有哪些复杂的定义,那首先就是大家看到前面有一个叫做呃个体模式和组合模式的一个区分,哎,这里边主要是说什么呢?我们不是讲了这个定义这个pattern吗?定义定义出来这个叫模式嘛,那模式里边有没有具体的分类呢?有,哎,这里边具体的分类最主要的就是分成了个体模式和组合模式,哎,那个体模式从字面就非常好理解。
09:52
什么叫个体模式呢?就是单独的一个规则的定义对吧?那就单独的一个模式的定义就叫做个体模式,具体来讲就是前面我们定义这个patternthon的时候,大家看到很复杂对吧?就是前面发生什么,后面发生什么,然后这个后面又紧跟着发生什么,那这个其实是挺复杂的一个过程,那组成这个复杂事件序列里边的每一步操作,就有点像我们说的这个每一步一样,对吧?啊,就是第一步,我这个begin start,这里边要提取什么事件,这就是一个个体模式啊,那中间这里边next middle,这里面提取什么事件,这也是一个个体模式,所以大家看到个体模式是可以用这个类似于一个连接词一样的东西把它连接起来的,对吧?底层都是一个方法调用啊,那我们这里边可以认为它就相当于是用这个next followed by,还有是begin啊,把它们就是串起来了啊,所以这串起来的每一行,这里边的这个操作都是一个具体的个体模式。
10:52
那然后就是很多个个体模式组合起来,我们说的拿这个连接词把它串起来,就形成了所谓的组合模式,或者可以叫模式序列啊,可以有这样的一个划分,这里大家要注意一下,所有的模式序列都必须以一个初始模式开始,什么叫初始模式?就一开始必须是t.begin对吧?当时大家也看到了patternon,呃,Patternon去调它底层的方法的时候,只有begin没有上来,一上来就调next的啊,所以这里边如果定义一个模式序列,必须以这个begin开头,然后另外还有一个特殊的这个模式呢,叫做模式组,什么叫模式组呢?这个就复杂一点,相当于是一个模式的嵌套啊。呃,简单来讲就是什么呢?我定义一个这个个体模式的时候,可以在里边作为条件,传什么呢?传一个模式序列,就里边又是一组模式的定义,对吧,我直接把它定义在这个个体模式里边,这就相当于变成的一个嵌套。
11:52
套的形式了,对吧,就是我里边要检测的某一个,呃,具体的某一步操作,里边又检测的需要检测的是一组事件啊,类似于这样的啊,就是层层嵌套这个模式组,一般情况可能只有非常复杂的需求才会用到,所以一般情况可能用的场景比较少,大家稍微做一个了解就可以,一般我们主要是用什么组合模式对吧?把个体模式结合起来,这就是这个Python API的一些主要的内容,整体的介绍。
我来说两句