00:00
我们已经了解了CP的基本概念,那接下来呢,当然就是先做一个快速上手,看一看在代码当中怎么样使用flink cp去实现具体的需求。啊,那当然了,想要使用CP首先要先引入相关的依赖啊,那我们这里引入的依赖呢,就是直接叫做link cp,然后我们现在是skyla版本的API,所以后面加上skyla,然后指定对应的LA版本,而对应的当前这个依赖的版本呢,自然就是跟flink的版本一致了,01:13点零。这里需要多解释一下啊,就是所有的连接器和库都是不包含在弗link的核心依赖当中的啊,因为我们知道如果把这些都加进去的话啊,有一些场景根本用不到,那整个这个flink就显得比较臃肿了嘛,所以这些如果我们要想使用的话,都需要额外去进行引入,哎,那所以如果说我们在flink集群当中想要提交运行CP相关的这些作业的话,那就必须像我们运行这个Li CQ一样啊,把相关的依赖的那个抓包要放在当前集群的lib library目录下边啊,那所以从这个角度来看的话,就是CP啊,跟FCQ其实是一样的,都是最顶层的应用级别的API啊,它主要是针对这个上层的应用而引入的库啊。那接下来我们就首先先将对应的CP依赖引入到号文件当中,直接copy过来啊。
01:35
接下来我们在微问这里刷新一下,好,现在已经引入了flink c scale2.12,然后接下来呢,我们就可以去尝试去实现一个具体的应用案例了,那我们现在要去实现的呢?呃,非常简单啊,就是刚才提到的想要去检测用户的一些异常行为,做一个风险控制啊,那我们这里检测的就是用户的登录行为,如果连续三次登录失败,那么就要输出一个报警信息啊,那很显然了,这是一个复杂事件的检测处理啊,所以我们直接用CP就可以搞定它,那首先呢,我们先来定一下当前要处理的数据的事件类型啊,哎,那现在我们这个就不应该是用户去访问某个页面的这个event了。
02:19
那现在我们应该是一个登录事件,那怎么样定义这样一个登录事件呢?呃,最简单的方式其实就是里边有一个用户IDUID,然后接下来呢,对应它登录事件可能有某一个IPIP地址,另外呢,还有一个登录的类型,事件类型,也就是说本次登录到底是成功还是失败啊,那最后再来一个时间戳,主要就是这些字段啊,把这个定义好,作为一个样例类声明出来,接下来我们要创建的这个数据呢,就都是log event对应的一个对象啊,那我们可以看一看有哪些测试数据啊,比如说现在我们接下来呢,呃,就定义这样的一些logging event,我们看到有USER1和USER22个用户的登录事件,然后呢,他们可能基于不同的IP地址去做了登录,有些诶是fail,有些是success,所以我们会看到啊,这里USER2它的登录过程当中,有一次是登录成功了,是success而。
03:19
USER1呢,它有连续三次登录全部都是失败,都是废物啊,所以按照我们的检测标准啊,只要连续三次登录失败就应该报警,那么USER1应该报警,USER2呢就不应该报警,这个success那是没有问题的啊,就不应该报警,所以接下来我们就在代码里面看一看。怎么样用cep去实现这个过程啊,那当然首先我们可以思考一下,就是如果说啊,我们没有学过CP,直接用之前我们学习过的API,怎么样去处理这个需求呢?诶,那自然就想到了,那状态编程嘛,你现在既然要连续三次登录失败,诶,那我就先检测之前有没有登录失败,如果有一次,哎,我就保存下来,保存成状态,然后如果再来一次的话,再保存成状态,所以我可以用一个列表状态,或者是用两个值状态保存已经来的这些登录失败事件就可以了嘛,等到第三个事件来的时候,诶匹配成功了,那就输出结果。所以整体来看的话,这个其实使用状态编程也并不难实现,当然了,这个需求其实比我们之前所说的要简单很多啊,我们之前说的有可能还涉及到时间啊,那就是可能在一段时间内。
04:30
连续三次登录失败,这才是我们想要的这样一个结果啊啊,那现在我们先简化一点,就是三次登录失败就报警,好,那接下来我们就在代码里边尝试去做一个实现。现在我们应该是新的一章。那是第12章,所以我们创建一个package。CHAPTER12。然后接下来我们实现的这个需求定义一个sky object,现在我们主要是检测用户的登录失败事件。所以是。老。
05:01
做一个检测。那方法先写出来,那首先这个处理流程呢,其实还是一样啊,当然现在跟这个CQ就没关系了,我们不去定义表执行环境了,而是直接定义当前的流失执行环境就可以,所以首先stream execution。Environment。好,我们把这个引入啊,那还是get把它命名成env。同样不是正确性,我们先把这个全局的并行度设成一啊,这里需要注意就是为了后边做这个流失转换的话,还是把这个下划线先引入啊,然后接下来诶,那就可以去。读取数据源。先得到一个我们要处理的这个登录事件流啊,那所以。我们在这里呢,当然就是烟直接from elements啊,把所有的这个测试数据直接拿过来就可以了啊,那我们就从文档里边啊,把测试数据做一个复制。直接拿过来。
06:01
然后接下来啊,在这里的这个long event我们还没有去定义啊,所以在上面应该要定义一个。定义。登录事件的样例类。Case class,我们首先要定义出来,这个就叫做login。啊,那当然了,这里面需要对应的这个字段啊,首先第一个这个应该是user ID。然后接下来第二个字段,我们把它叫做IP address。哎,那第三个字段,这个是它的event type。登录的一个类型啊,最后还有一个是时间戳。Time step。长整形的时间戳啊,那把它定义好了之后,接下来读取数据源得到的这个我们就可以把它叫做。Logging event strip登录事件流啊,那当然了,假如说啊,后边我们还涉及到了跟时间相关的操作的话,那我们基于事件时间的话,那就应该要去指定怎么样去提取时间戳,然后生成水位线啊,这个都是需要有这个流程的啊,当然这里我们好像没有涉及到时间相关啊,不做这一步也行。
07:12
为了形成这个固定的习惯,我们还是把这个做一个提取吧,啊,那当前我们这个数据很明显,2345678,这个都是按照时间顺序依次排列的,不涉及到乱序,所以我们就可以直接assign asending time stamps啊,升序的数据直接传一个提取时间戳的方法就可以了,这是我们基本的这个数据源的一个定义,然后接下来上面如果这个作为第一步的话。接下来这就是真正我们要进行CP处理的具体步骤了。接下来第二步。那就是要首先得定义一个进行复杂事件处理的匹配规则,也就是我们所说的pattern模式。定义。Pattern。要检测的是检测连续三次登录失败事件。
08:06
这就是我们想要的这个模式啊,所以这里边我们直接就把这个叫做patternon吧,那这个patternon怎么样去定义呢?诶,这个其实非常简单,我们就是使用pattern API,好,那这个pattern API当然是CP帮我们提供的了,它有一个类本身就叫做pattern,哎,我们引入link cpla.pattern下面的pattern啊,那当然了,这里我们使用的就是它对应的这一个伴生对象啊啊,那当前我们创建这样一个pattern的对象实例之后,接下来要怎么去定义呢?我们看它接下来能够调用一个叫做begin方法。这个begin方法就会返回一个pattern类型的对象,所以begin指的是什么呢?当然就是指的我们当前这个模式以什么样的事件开始啊。所以这个在代码当中其实非常简单的,我们就直接调用pattern.begin哎,这里注意啊,这个begin后边有一个中括号,我们看到。
09:04
它这里有中括号,也就是带着一个泛型参数啊,这是一个泛型方法,这里的参数呢,其实就指定了后边我们在这个检测事件的过程当中,要检测什么样类型的事件啊,当然了这个类型必须是我们之前要处理的这个整个事件流里边啊,数据类型的子类型啊,那所以这个X我们可以单独的定义一下,那这里我们能够去检测的事件类型的也没有别的。特别的这个子类型啊,我们直接定义这个logging event就可以了。然后接下来我们看一下这个begin方法,它所要传入的参数,这里边呢,最简单的传参方式就一个参数,就是一个name,一个string类型的名称,这是什么意思呢?诶,当然就是我们现在不是要定义一组这个匹配规则吗?现在是begin,就是当前这一组事件里边,最前面第一个事件到底是什么啊,那当然了,到底是什么的话,首先你得给他个名称嘛。
10:09
比方说我们就把它叫做first第一个事件吧,所以这里你给的这个名称。就是这里我们指定的这个参数啊,比方说就叫做first。或者说我们直接叫做first few也可以啊,就是第一次失败。那我们想到你这里边叫他是第一次失败,它就是失败吗?所以我们当前选取这个事件的时候,还得有对应的单独的这个匹配规则,你要把它的特征要定义出来,它有什么样的特征呢?那我们接下来可以看到它后边可以调用一个where方法,指定一个筛选的条件。啊,那所以这里其实就是where里边我们看啊,传入的就是一个condition,它可以去实现对应的这个类啊,比方说实现一个。Condition啊,可迭代的条件,那更简单的方式呢,当然就是直接传入一个拉姆达表达式就可以了,我们看这样一个拉姆达表达式的写法,就是当前输入的事件是loging event类型,也就是它这个条件的参数,然后呢,返回的就是一个波ing类型,布尔类型的逻辑判断值,那所以就是说如果当前返回的这个值是true的话,那就筛选出对应的事件,如果要是为false的话,当然就不匹配了,当前事件就不匹配。所以这里面我们的规则写一个拉姆达表达式的话会非常的简单啊,直接就是下划线点当前的英文type,那就必须要去。
11:37
等于feel。这就是第一次登录失败了,所以我们可以直接注释一下,这就是。第一次登录失败,实践先把它检测出来。哎,那我们说这个pattern里边,你不光只有一个事件啊,后边还有很多个,我们是检测一组事件,复杂事件,那所以接下来怎么办呢?接下来要检测的,哎,那我们看就得有一个连接词,把后面的事件要连接起来了,我们看它可以有的连接词。
12:11
Followed by followed by就是。后边跟着的是什么样的事件,那我们现在是只要跟在后面就行吗?不是,我们是要紧跟着,紧跟着呢,这里有专门的一个连接词,叫做next next很好理解,就是下一个嘛,哎,所以这就是紧跟着的连接词的意思,然后我们看next里面要传的呢,也是一个string,也是一个名称,刚才我们看到啊,本身这里要传的。本身可以是一个Python啊,就是可以嵌套一个模式进去,那当然我们这里边没必要那么麻烦啊,我们用这个最简单的实现就可以NEX里边直接传一个内传一个string类型的名称,哎,所以这里所谓的名称就是指的第一个事件begin,这个事件后边紧跟着的第二个事件叫做什么名好,那我们第一个事件叫first few,那第二个事件当然我们就可以叫second few了。
13:05
第二次登录失败事件,那当然了,对应的这个事件也可以跟一个where做一个条件判断的筛选,那所以这里面的规则当然也一样了,完全一样嘛,Even type要去等于fail。这是。第二次。登录失败事件。哎,那同样道理,我们会想到那一样的嘛,接下来再来一次的话,不就又是跟在后边的一个next吗?啊,那只不过我们把这个稍微改一个名称,这个叫做third few,呃,然后接下来同样它的筛选条件也是in type等于feel,这就是第三次登录失败事件,这就是我们所定义的连续三次登录失败事件的。匹配规则,一个pattern就定义好了。
我来说两句