00:00
我们已经对CP有了充分的了解,那在本章的最后呢,我们再来做一点扩展,讲一讲CP的状态机实现。都知道CP其实是flink为我们提供的应用层级的一套API,它可以让我们方便的进行复杂事件处理,那我们自然想到,其实这套东西使用data stream应该也能实现,只不过我们前面说了,使用stream API或者process的话,我们需要做状态编程。而且里边的。处理逻辑可能会非常非常的复杂,可能是各种if else的嵌套,诶,那那我们自然想到有没有更加简单的实现方式呢?如果我们仔细思考的话,会发现CP当中最关键的核心点模式的定义,它其实跟正则表达式非常非常像的。而正则表达式,它的引擎的底层实现是什么呢?其实就是一个状态机啊,那或者说就是所谓的NFA是一个非确定性有限状态自动机。
01:13
说我们当前。给整个系统设置一个状态,然后呢,每来一个数据,就相当于对于当前状态有一个输入,那么在对应的状态和输入的作用下,当前状态可能就会跳转到另外的一个状态。哎,那整个这个状态根据不同的输入进行跳转的这个过程,如果我们把它构建出来的话,这就是一个状态机啊,所以接下来呢,呃,我们就是简单的介绍一下。之前的。连续三次登陆失败检测的这样一个事例,怎么样用状态机来进行一个实现?我希望通过这样一个例子,让我们能够更加深刻的理解cep的底层原理。
02:05
那首先我们可以先画出一幅图,我们先来看一下连续三次登陆失败,它的状态跳转是一个什么样的过程,那首先我们会想到初始状态,哎,我们就直接定义一个引initial手吧,引initial手初始状态基于这个状态,如果来了一个数据是F的话。那我们就想到了当前就应该跳转到一个新的状态,我们当前就把这个状态叫做S1第一个状态,其实我们知道当前就是要去继续等待失败的数据了吗?所以如果说是的话,我们就做状态跳转,那如果是success来了的话。其实我们知道success来了之后,直接就直接就终止了,当前的状态转移就已经结束了,我们就直接退回到初始状态,继续等登录失败的数据就可以了啊,所以这里边我们可以单独的再定义一个所谓的terminal状态,Terminal状态的话,其实我们知道它没什么特别的用,我们只是明确的告诉当前的判断,这一轮判断已经结束了啊,所以一旦到达terminal状态,就可以直接跳转重置到初始状态。
03:17
然后同样的接下来基于这个S1状态,如果又发现诶一个登录失败,然后紧接着又来了登录成功的话,那同样还是termin直接把它跳转回去。如果说S1状态下来了的话,那这个时候我们应该是跳转到S2状态啊。连续两次登陆失败了,那如果S2状态又来了一个登陆成功的话,那同样还是terminal状态,再退回到初始状态,那如果S2状态。再次来一个非状态的话,登录失败的话,那那当前就变成了一个match的状态,就真的检测到了一个连续三次的登陆失败。啊,那当然了,接下来的逻辑我们可以自己去定义,按照我们之前的定义方式,那是所有的连续三次登陆失败都要检测出来,所以我们知道,假如说有连续的四次登录失败的话,那很显然以第一个开头,这个要检测第二个开头。
04:19
这个也要检测,哎,这也是同样连续三次登陆失败。那所以当前的match如果已经检测到之后,它退回的状态是S2。啊,也就是当前相当于我们检测到了一组匹配之后,接下来是基于已经检到的两个登陆失败,要再去等待下一个登陆失败时间。所以是这样的一个状态转移的过程。啊,当然了,本身这道问题我们还需要去考虑事件时间语义下的乱序数据的问题,那如果要考虑这个的话,我们还需要这就是类似于之前我们想到的那种处理方式啊,还需要去设置一个缓存,所有数据来了之后,先要做缓存,然后进行watermark的推移啊,那根据这个watermark延迟时间推移到一定程度之后,把对应的缓存里边的数据按照时间戳做排排序。
05:16
在之前所有的数据已经到达的所有数据拿出来去做一个当前的判断,但是我们想到这个过程就太复杂了,所以当前干脆我们就不看当前的事件时间了啊,我们就只是按照它到来的顺序去做一个这个状态转移,只要理解一下这个状态机怎么实现就可以了。所以接下来我们在代码里面具体做一个实现。那当前我们其实是实现了一个NFA,所以我们就直接写一个NFA。其实本质上。所处理的内容还是之前我们所做的那个三次连续三次登录失败啊,那所以前面这一部分我们还是可以把它直接copy过来。
06:09
当然了,呃,到这里为止,后面这个时间出口watermark我们就不需要了,因为我们这里只考虑它的先后顺序啊,相当于按照处理时间去判断就可以了。直接把这部分copy过来啊,那当然了,这里边我们可以直接把它定义成data source,这里我们可能还需要提前做一步操作,就是应该要按照当前的user去做一个分组,哎,那所以这里边我们其实即使是不提取type,不去生成water,也应该做一个K啊,那这个方便我们后续的操作,那所以这里我们可以把当前的这个。对应的K先指定出来。User ID啊,那所以把这个指定出来之后,上面的类型也要改成。
07:01
啊,或者我们直接写这个,呃,Dataream也是可以的,那当前的类型是stream。这就是当前第一步获取登录数据流,而且进行分组的一个过程。那下面我们应该有一个花括号。最后env execute执行起来,这个流程我们定义好。然后接下来那当然就是把输数据,我们要构建一个状态机,然后每一个数据来了之后,就会引起我们当前状态机的状态改变,状态迁移,然后一旦发现达到了对应的那个match的状态的话,最终的这个状态的话,那我们就会输出。对应的一个报警信息,而且再把它重置,跳回到S2状态,其实就是这样的一个转换过程,所以接下来要做的操作也很简单,就是。数据。按照顺序依次。
08:03
输入。我们用状态机。进行处理。其实主要就是要涉及到状态跳转。所以当前的这个处理过程我们应该定义成一个,呃,使用什么样的API去进行处理呢?其实这个过程也比较简单,因为我们没有涉及到时间操作,最多就是用到了一些一些状态啊,那所以当前肯定就是说只要做一个状态编程就完了啊,那当前我们干脆啊,就直接logging in the stream。活的输出的话,哎,那当然是使用map比较比较方便啊,我们直接去定义这样的一个function,应该就可以搞定当前这个需求了,那所以这里边我们可以直接拗一个,比方说我们当前这个就叫做state machine。状态机啊,然后。
09:00
其实应该是啊,我们把它定义出来。然后这是当前我们得到的一个。最终得到的结果应该是一个warning string。最后warning stream做一个打印输出,这就是我们想要的最终的效果啊,那当然了,这个类型啊,输出的类型应该是。
我来说两句