00:00
了解了CP的基本概念和原理,那接下来我们还是通过一个简单的来做一个快速上手,先来看一看在代码当中cep到底该怎么用。那在使用CP之前呢,我们首先还是需要先引入相关的依赖啊,那这里我们要引入的就是flink cp后边跟着的是SC版本,下面这个version呢,当然就是对应着flink的版本了,我们需要把它添加到当前POM文件的。Depend。把它先引入啊,那所以我们会知道对于这个CP而啊是高层的API跟fli CQ其实是一个啊,应用层级的,那对于flink而言,本身它的核心部分是不会去包含任何的连接器和对应的,呃,像这个CP啊,CQ啊或者是呃。
01:01
Learning机器学习ML这一部分的库其实是不包含的,因为link要保持尽量少的核心依赖,这样的话就可以避免冲突,那所以如果说我们要在集群里边去运行flink cep的话,那还是应该把对应的抓包就当前这个CP的依赖,我们应该把它放在。Library Li目录下边啊,那当然了,就是我们在开集成开发环境里边,想要去测试CP的时候呢,就直接把这个dependency引入就可以了,注意在打包当前作业的时候就不需要把它打包进去了啊,那这是一个通用的一个依赖包,我们直接放在利目录下。那有了相关的依赖,那接下来我们就来看一个最简单的需求,就是我们前面所提到的连续登录失败,我们检测用户这样的异常行为,然后来输出一个报警信息啊,所以很明显这就是一个复杂事件的检测处理了,我们首先可以想象一下。
02:06
这样一个需求,如果说我们不用其他更高的这些API,这些库,我们直接用之前学习过的API能够实现吗?呃,仔细想的话,其实应该是可以实现的,因为我们当前是连续三次登陆失败嘛,那我们可以直接去定义状态,我们定义。保存前两次登陆失败的对应的事件,然后接下来如果检测到第三次登陆失败的话,我们就直接输出一个报警信息啊,所以这个其实还是比较容易能够想到的。但是这个逻辑看起来很清晰,我们只要保存前两次的登陆失败事件就可以了,但这里面又涉及到另外一个问题,就是那假如说中间来了一个登录成功怎么办呢?比如说我们可以看一下。这里给出的示例我们可以看到,这里边定义了USER1和USER2,那自然我们知道对于这样一个连续登录失败的检测,很明显应该是针对同一个user来看啊,那我们看这个USER2 user2这里我们看到它先有一次fail,后边呢又来了一个success。
03:21
在后边又是连续两次费,那这个能叫做连续三次登录失败吗?很明显这个不叫,因为中间它有success,那说明用户是可以正常登录的吧,哎,那可能其他的这一个费用只是他的一个操作失误。这个敲错。键按错键盘输错密码了啊,那所以对于这样的场景,我们是不应该把它作为匹配起来的一组复杂事件。那如果说我们在代码当中要去判断的话,那很显然就应该不停的去判断当前诶。到来的这个事件是否有success,如果有success的话,诶,那相当于之前的那个费用也就不要继续保存了,还得去清空啊,接下来我继续按照这个费用再去保存第一次费用,第二次费用啊,所以这个中间如果插入success的话,是会打乱我们之前检测保存下来的那个状态。
04:18
除此之外,我们还得考虑到,诶,那当前我们这个数据看起来这是按照顺序一个一个来的嘛,那假如说我们现在是分布式的流处理,如果出现了乱序数据怎么办呢。假如说我们看到这个USER2第一次费用,然后呢,七秒的第二次费和八秒的第三次费用先来了之后。六秒钟的这个success才姗姗来迟,诶,那对于这样的行为,这算连续三次登陆失败吗?很显然,我们当前这个时间它的先后顺序啊。因为我们这里面应该按照事件发生的时间来判断它是否连续,很明显当前的这个数据是否乱续,不应该影响到我们最终的结果输出。
05:06
所以说那正常来讲的话,这个六秒钟的success放在后边也应该当前这个它不叫做连续三次登陆失败啊,那所以我们就又应该得有更加复杂的考虑了,就不能是直接看到,诶之前有一次两次,然后来了第三次就直接输出了,我们还得判断,那等一下是否有乱序数据,本来应该在之前发生的这些事件,稍后还会再到来呢。哎,那我们这个要处理的逻辑就太复杂了。那直观来看的话,当前我们使用data API或者使用process都不是那么容易能够实现,当然肯定是可以实现,但是很复杂。所以接下来我们直接考虑对于这样的一个复杂事件处理的需求,直接用cep来做一个实现。好,那接下来我们就在代码里边直接来看一看怎么去写这样的一段代码,CP的调用。
06:04
那这是新的一章,我们首先还是去创建一个新的package。第12章,然后接下来我们还是创建一个Java,当前我们是要做一个。Login field检测detect。这是一个具体的案例了,所以我们写example。把它放在这里。然后接下来我们首先啊。P把义出来,接下来呢,我们要做这个登录事件,发现它续进行那里,我们发现首先应该有对应的事件类型才行啊,像之前我们定义好的这个event呢,它其实是用户的一个访问点击的一个事件啊,那这里面只有一个URL啊,那这里面我们并没有看出用户的一个登录状态,所以之前的event就不是我们当前所要处理的事件类型了。
07:03
那当前呢,我们就单独的去定义这样一个。登录事件类型,好了,哎,那所以接下来我们还是单独定义一个列,直接放在当前第12章的这个包下边,把它叫做log in event。那这样一个登录事件里面应该包含哪些字段呢?呃,我们就用最简单的思路来做一个抽象,自己去给它一个定义吧,啊,那首先当然应该有用户名了。所以。里面的属性我们都定义成public类型啊,这是flink对于类型系统要求的啊,必须这么去做,那首先我们给一个user ID,我们就定义成string类型吧,或者如果说我们的系统里边是长整型的话,也是可以的。除了用户ID之外,另外呢,呃,用户每一次可能他会基于不同的IP地址去做登录,所以这里面我们还可以记录一下每次登录的IP地址。IP address。
08:01
另外还有一个非常关键的字段。因为我们要。检测当前用户登陆的状态到底是成功还是失败,所以我们应该有一个字段去表示它到当前这个登陆事件的类型到底是成功的还是失败啊,所以接下来我们可以有一个public string,同样还是这个,我们叫做event。只有两个取值,一个是success,一个是。啊,那最后还是常规的,应该对流逝处理的事件里边保留一个时间戳长整型的time,这就是我们的基本定义。那当然了,在flink定义的这个po类型里边,我们应该要有一个空餐的。构造方法,那对应的带参数的构造方法,我们也可以把它完整的列在这里,方便后边我们去构造对应的。对应的对象啊,那另外还有我们可以直接把这个to string方法也直接写出来,方便我们打印输出,这就是基本的po类型的定义,接下来呢啊,那我们就可以在这里边去定义对应的数据源了,那这里边数据源定义之前,我们首先还是得有当前的流失执行环境。
09:16
因为flink cp它的调用方式其实跟datapi非常类似的,也是基于去进行检测的这个过程,所以当然就不涉及到像之前我们那个里边要去定义表环境了,只要当前基于stream execution environment流执行环境就可以了啊,所以当前的这个定义过程跟datapi调用的。过程是完全类似的啊,先get execution environment,把它叫做同样还是不是一般性,我们把它全局的并行度定义成一,哎,这样的话,当前的测试看到的就是完全按照顺序的这样一个结果了啊。那首先接下来的第一步,我们应该获取。
10:02
登录数据流,其实就是登录的事件流啊,就是一组事件,我们要输入进来。因为。这里我们就直接把所有的测试数据都。用from elements这种形式定义在这里就可以了。那当然,这里边我们就需要去一个logging event的对象了,这里边具体的数据可以直接参考。这节当中给出的示例代码,我们直接把它copy过来。这里主要是定义了两个用户,USER1和USER2,呃,USER2前面我们都已经分析过了,他在四秒的时候有一次登录失败,但是呢,六秒的时候有一次登陆成功,后面七秒八秒又有两次登陆失败,所以它其实并不是连续三次登陆失败,我们不应该把UR检测出来报警。而USER1呢,这个就很明显,尽管中间插入了USER2啊,但是我们知道对于登陆行为的检测显然应该按照user去进行一个分组啊,我们只针对当前的用户去检测他的行为就可以了,那USER1很明显两秒、三秒和五秒连续三次登陆失败,所以我们预期最后结果是检测出来USER1。
11:19
连续三次登陆失败报警,USER2应该检测不到,不会报警,这是我们的一个基本的预期啊,那当然了,当前要对它进行处理的话,很显然应该还需要去提取时间戳定义对应的,所以接下来我们在调用time maps那里边的话当然是。Out,那这里面我们可以看到基本的这个时间呢,是按照顺序依次定义的,所以这里边我们可以首先直接给一个。du.zero那当前我们应该有对应的类型,就是logging。
12:01
然后后边给一个time。Time stamp a sign,你有一个zable time stamp提取时间戳,当然当前提取的就是time stamp这样的一个字段,那这里边后面加了000。很显然我们是按照。按照这个毫秒数来定义的,所以这里边我们后面也不用再去乘以1000啊,做这样的转换,直接获取出来就是一个毫秒数。这样的话,我们就把当前的事件流获取到。我们这里可以定义一下,比方说这个就叫做。老。Event。Strip。登陆事件流。然后按照CP的处理模式,它的第二步,接下来要干什么呢?我们得定义一个,定一个复杂事件的规则,它的组合规则,这也就是我们所说的pattern一个模式,接下来是定义模式。
13:03
那当前我们要定义的模式当然就是连续三次登录失败了,那我们看看在CP里边怎么样去定义这样一个连续三次登陆失败的模式。这里面的定义方式呢,需要用到pattern这样一个类,我们看到,呃,这里边要调用的啊,有很多pattern,我们这里边要用到的是flink CEp.pattern下面的这个pattern。要调用它的一个形态方法,这表示当前复杂事件以什么事件开始。一定嘛,开始的一个时间。啊,那对应的这个begin前面呢,也需要有一个泛型的定义啊,当然是logging event了,当前的事件类型就是logging。里边要传一个参数是一个。String类型的参数,我们看到,那这个string表示什么呢?就是表示开始的这第一个事件,它的名称,呃,这个名称主要就是方便后边我们去提取,相当于是一个标签,就是我们检测到了一组复杂事件,诶那后边要去处理的时候,怎么样表示这里边检测到的每一个事件呢?诶就用这里边我们对应的这个标签或者说名称来指代当前检测出来的事件。
14:23
每一个简单事件都可以有对应的这样一个名称,那这里面的话,开始当然就是第一次登陆失败啊,我们就把它叫做first吧。然后当前是一个登陆失败事件。显然我们还应该有一个。对应简单事件的筛选条件啊,那就是必须得是那个type等于feel才可以符合当前的要求啊,那所以这里面呢,这个条件怎么去定义,后面点where,掉一个where。表示传入一个条件里边,我们可以直接一个simple condition简单条件。
15:01
然后我们会看到这个simple condition啊,本身它是一个抽象类,里边有一个抽象方法,就是,所以我们会发现它其实跟这个filter方式有点像,就是相当于要做一个过滤,我们这里呢,返回一个布尔类型的值,如果返回false,那说明诶当前这个不符合条件,我们就不要筛选出来,那如果说返回true的话,符合条件,那就可以作为我们的begin,叫做first的这个事件,就可以把它先提取出来放在那儿。那后面至于有没有接下来的第二次,第三次登陆失败,那要看后面,所以这里面我们的定义呢。这里面return,应该return value。Even type要判断它是否equals,就是这样一个判断。所以这里边我们其实是。第一次。第一次登陆失败事件。
16:04
然后接下来啊,这个还没完,这是第一个事件,我们已经定义好了,接下来呢,后边要紧跟着第二个登陆失败事件,哎,那所以接下来这个紧跟着怎么表示呢?两个事件之间应该得有一个连接词,我们的连接词就是。可以直接点next next我们知道是下一个的意思嘛,所以就是在后面紧跟着,那当然这个next里边也得有一个string类型的name啊,那所以这里边我们就可以前面第一个叫做first,那当然第二个登录失败我们就叫了。呃,那接下来同样这里还应该有一个V,要表示当前是一个登陆失败事件,得有对应的筛选条件啊,那这个就很简单,跟前面一样吧。所以可以直接把它copy过来。这是一个第二次,紧跟着。
17:00
第二次。登录。失败事件。然后再接下来啊,其实我们知道这很明显一样的嘛,后边还有第三次登陆失败,我们直接把这个。改成。Third就可以了啊,接下来紧跟着第三次登陆失败事件。这就是我们连续三次登陆失败这样一个模式的定义,这样就定义好了啊,当然我们可以看一下得到的是一个什么东西。得到的就是一个pattern,就是一个模式,所以我们可以直接把它就叫做pattern吧。
我来说两句