00:00
有了这个模式之后。接下来的第三步,关键是要把这个模式要应用在。对应的事件流上,那就开始检测匹配的那些复杂事件了。所以接下来是将。模式应用到。数据流上。检测复杂事件。当然了,下边还有一步操作。那就是要将。简。测到的复杂事件。提取出来。啊,那我们提取出来的可能就是一组事件,那我们当前检测到一组,那就应该对应的经过处理之后,输出对应的一个结果,做一个报警信息的输出,啊,那所以就是提取出来之后。
01:00
进行处理。得到报警信息输出,那我们这里的报警信息直接就变成一个string类型,直接打印出来就可以了啊,就做了一个简单的报警。就是这样的两步操作,所以这里边我们首先要做一个模式应用到数据流上的一个一个步骤,那这一步呢,主要是要调用。CP我们看到啊,本身就有CP这样一个类调用它的点pattern方法。那这样一个CP.pattern方法呢,它是要传入两个参数,一个就是data当前的数据流,那另外一个就是我们定义好的模式。那所以这两个参数传进来之后,它会得到一个什么东西呢?诶,我我们其实可以看到它会得到一个叫做pattern stream的一个新的数据类型。其实这个过程跟我们之前讲到data STEM API的时候啊,整个这个处理转换的过程非常的类似,就是我们之前比方说啊,讲到window API的时候,那假如说我们应用了一个点window操作,那。
02:14
相当于是定义了这个窗口分配器啊,那得到的呢,是一个window,得到了另外一种数据类型,然后window strip再去定义对应的窗口函数之后,经过处理之后又会得到一个。那别的其实也一样,你像我们做两条connect连接的时候,那都是一条流data stream。点。这个得到的是一个connected streams,然后在后边再去点map,或者做process Fla map这样的操作,处理完了之后又得到了一个data,所以现在也是很明显是这样的两步操作啊,就是先得到一个pattern stream,然后后边很明显就是做转换计算再得到一个。
03:04
啊,所以接下来就首先是这一步操作,传入对应的数据,那我们这里边直接把这个logging string传进来可以吗?注意我们这里边还应该要对当前的user ID进行一个分组,要不然的话,所有数据如果统一按照这个feel或者success去做这样的一个检测的话,那这个就不对了。啊,那所以这里边本身CP啊,它这里边传的data stream也是支持K,如果我们定义了K的话,接下来所做的所有操作都是基于当前K。那当然了,K stream本身也继承自data stream,所以这个是没有问题的,传在这里是没有问题的,我们可以直接做一个K,那当前本来的这个event要提取一下里边对应的字段。刚才的key是user ID,这是第一个数据流这参数,那后面呢,当然就是patternon了,把pattern传进来,得到的是一个pattern string。
04:07
这是第三步,然后接下来就是对这个pattern要做一个。转换处理了,诶那我们可以看一下这个stream可以调什么样的方法呢?它可以调select。Side output data啊,那还有Fla select,另外还有process啊,主要就是可以调用这些方法,那这中间的这个output data很明显它是用来处理到数据的啊呃,延迟数据的,那这里边呢,涉及到的就是把它放到这个里边,跟窗口的那个操作非常的像,这里我们不涉及,我们只要看别的一般的这种处理就可以了。最明显的处理当然就是select select select很明显就是类似于map和map嘛啊,所以我们就直接用一个最简单的select来看一看。因为我们是想把对应的复杂事件提取出来吗?Select不就是提取的意思吗?选择提取的意思,那它里边呢,需要传入的是一个。
05:08
Pattern s的方式,这跟之前datapi的用法几乎就完全一样啊,我们之前就是点map里边传一个map方式,点filter里传一个filter方式,点process里边传一个process方式,那这里面点select传的是一个pattern select方式。那这里面我们看到select function还是有两个型的,它分别表示什么呢?很明显这就是我们当前的输入数据里边的类型,以及转换之后输出数据流的。输出类型,那所以这里面就是一个硬一个out嘛。类似于一个map的处理转换,只不过当前我们要处理的数据是一个复杂事件,是一组事件,然后输出的呢,是一条数据。所以接下来我们里边直接可以你一个。
06:03
Pattern。Select function。我们最终的输出。报警信息嘛,直接字符串打印输出就可以了。里边必须要实现的是一个select的方法,然后这个select的方法很明显传入的就应该是当前检测到的复杂事件,那输出的呢,就是我们想要输出的那个报警信息了。那这里边的复杂事件,我们看到它是用一个map来进行保存的,为什么是一个map呢?那这里边的map key又是一个string,然后呃,对应的value是一个list logging类型的list,为什么是这样一个数据结构呢?这里主要是涉及到我们当前复杂事件,它可能有一组好多个,哎,那所以怎么样去区分这里面的第一个,第二个,第三个这些事件呢。用它的名称,之前我们说的啊,用它这个名称,或者说它的这个标签来进行区分,那所以这里的key。
07:03
这个string类型的key就是前面我们给定的每个简单事件的名称。那根据这个名称就可以直接把对应事件提取出来了,那这里面的value我们提取出对应事件应该是一个logging event类型啊,为什么还把它包装了一个list呢?这主要是考虑到有些当前这个简单世界定义的时候还可以定义它。重复发生啊,那后面我们会提到就是如果定义了重复发生的话,那显然这一个标签里边检测到的事件就有多个了,诶那所以这多个事件重复发生的事件怎么表示呢?放到一个列表list。当然,现在我们并没有重复发生的事件,每一个标签里边对应检测出来只有一个事件,所以当前list的其实就是长度唯一的一个例子吧,我们直接取单独唯一的那一个数据出来就可以。所以接下来我们可以做一个提取。
08:04
我们可以提取。复杂事件中的三次登陆失败事件。哎,那这里面我们可以首先提取出来,当然都是logging event了,首先应该有一个first。我们提取的时候,当然就是用这个map去做一个get get里边传KK是first第一次登录失败啊,那因为我们拿出来是一个例子的嘛,而且我们知道这个例子里边只有一个元素,所以我们可以直接GET0把它获取出来就可以了。那类似的。后边的第二次登陆失败和第三次登陆失败其实是一样的。我们可以在这里。把它叫做second和。Third,那对应的呢,就把这个K改成。
09:00
Second和third就可以了。诶,这样的话,后面还是直接GET0,因为当前每一个标签下面只有一个事件嘛,啊,所以接下来有了这三次登陆失败事件,那很显然我们就可以从中提取一些信息,把它包装包装组合一下,得到一个报警信息输出了啊,比如说我们就输出一句话,就是说哪个用户,然后连续三次登录失败,然后它的登录失败的时间分别是哪三次啊,那接下来我们这里面return就是用户ID啊,用户ID的话,我们这三个事件它的key都是一样的嘛,我们随便取一个拿到他的用户ID就可以了。然后加上一句话。那这里边就是连续三次登录失败。然后他的登录时间可以跟在后边做一个。做一个声明啊,那这里面我们要叠加的,当然其实就是。
10:01
可以加上当前。First。Event。A fair event对应的time stamp,然后再这个加一个逗号分割,那后边对应的再叠加上second和third。这里我们把这个对应的。事件改一下。Sir。这样的话,我们就可以包装出对应的。报警信息了啊,那最后把这个拿到之后,这个我们可以叫做。Warning street。我们是不停的有事件输入,然后在不停的实时检测啊,所以检测出来的报警信息也是一条流啊,当然了这一个输出的频次可能就不会像我们原始的输入数据那么高了啊,那这里最终接下来就可以做一个打印输出。
11:01
warningre.print。最后不要忘记整个流要把它执行起来,那外边的话,我们需要有一个。Exception,这就是使用CP进行登录失败连续登录失败检测的一个完整的案例,我们可以运行一下,看看效果怎么样。我们可以看到果然输出了USER1连续三次登陆失败,分别是两秒钟,三秒钟和五秒钟,哎,所以呃,这里我们看到USER1他换了这个IP进行登录,这个对我们登录失败的检测并不影响,因为我们知道很多情况下,黑客如果要做这个攻击的话,很有可能他是要切换IP去做攻击的啊,那这个我们关键是看当前的user,当前的用户到底是不是连续登录失败就可以了。
12:01
这是符合我们的预期的啊,那U2就没有检测到,那假如说这里我们把中间的这一行注掉,它的这个success注掉,那很明显,如果我们再次运行的话,就应该USER1和USER2都检测到连续的三次登陆失败。那这里还有另外一个问题,就是我们前面提到。这里是按照顺序依次发生的,那假如说。我们直接把USER2的。登录成功的这一次数据,六秒钟的这个登录成功放在最后边的话,那是不是当前它还是一个连续三次登录失败会报警呢。按照道理来讲,我们不应该报警,因为这只是数据输入,数据乱续了而已,跟之前还是一样的啊。那所以我们可以检测一下这种情况CP能不能够正常处理,当然了,在这种场景下呢,我们要注意就是wal mark的这个延迟时间就不能给zero,如果给零的话,那很显然这条数据会被丢掉嘛,迟到数据就直接被丢了,那我们当然就检测不到了啊,就相当于这个连续三次登陆失败,就认为是匹配上了嘛啊,那这个登录成功,这条数据被丢掉了。
13:15
那所以这里边呢,我们至少要给一个把这个当前的乱序时间能够涵盖到的一个watermark延迟,比如说我们给一个三秒的延迟。好,那接下来的话,这个八秒之后又来了六秒的数据,这个显然是可以处理的嘛,所以接下来我们再看一下运行一下。USER2还能否检测到它是连续三次登陆失败?诶,我们看现在就检测不到了,只有USER1是检测到了,进行了报警。得到的结果是完全符合我们预期的,CP可以把这个需求完美的解决。这就是使用CP进行连续三次登录失败检测的一个具体的应用案例。
我来说两句