00:00
我们已经了解了在FNKCP当中怎么样去处理匹配到的事件,那我们会发现啊,在这个过程当中,其实关键点就在于是要把检测到的每一个个体模式对应的那个事件保存在一个map里面,哎,所以整体来讲,我们这个操作过程当中啊,就是所有的事件一个一个来了之后,接下来呢,我们就按照定义的pattern去进行比对,如果说啊,比如说像我们之前这个啊,看的更明显一点,因为有三个不同的个体模式,那假如说来了一个事件,先去判断它是否符合我们这里begin初始模式定义的对应的这个事件,那如果说符合当前的匹配规则的话,那么就把它直接保存在一个map数据结构里面啊,那对应的K当然就是first few了,然后接下来呢,继续读取后边的事件,看看能否符合它接下来的下一个个体模式的匹配规则。
01:00
那如果说符合的话,那我们继续在这个map里边去进行保存,如果不符合的话,当前事件就直接丢掉。那这里面涉及到一个就是如果说我们当前的这个连接条件啊,连接词是一个follow by的话,那有可能出现就是当前这个个体模式不符合,诶但是呢,我们之前已经检测到的第一个事件还要继续保留,我还可以去检测接下来的事件。那如果说这个连接词是next的话,那其实只要当前个体模式不符合,相当于整个我们定义的这个组合模式就已经不符合了,哎,所以这个时候呢,就会把之前map里边已经保存的所有数据全部清掉。这就是整个我们进行匹配检测的过程。所以我们看到对于一个定义好的模式pattern而言啊,那最终检测事件的结果其实往往就是两种情况啊,要不就是匹配成功啊,就是如果说我们定义的这个组合模式里边的每一个个体模是都找到了对应的匹配事件的话,那么就检测到了一组真正意义上匹配的复杂事件,这个时候呢,就可以调用我们后边比方说这里是它跟select function的话,这个时候就调用这里的select方法,把我们已经完全匹配的那个map传进来啊,那如果说后面调用的是process方法的话,那这里诶,我们就调用process match方法,同样把所有匹配的事件放在map里边传递进来,接下来就进行检测到的匹配事件的对应的处理了。
02:38
那如果说我们破坏了当前定义好的这个组合模式呢?诶,那当然就没什么好说的了,直接就把所有的事件全部丢掉,重新开始检测就可以了,所以最终的结果要不就是真正的匹配检测到了,然后进行处理,要不就是所有事件都丢掉。但是在这中间呢,有一种比较特殊的情况,那就是所谓的如果我们在里边用到了一个微调,用了within方法之后,里边可以传入一个时间范围,就可以指定我们整个这个组合模式里边第一个事件到最后一个事件,他们先后发生这个时间段到底是多长时间。
03:18
哎,那我们就想到了,如果说啊,我前面已经来了,第一个事件匹配上了,把它保存在当前的这个map里边了,然后呢,后边第二个事件也匹配上了,保存到这个map里边了,但是第三个事件还没等来的时候,现在时间到了。那这种情况按照我们的要求,这其实算一种超时未匹配,但是我们会发现它其实跟匹配失败严格意义上来讲还不一样,它应该说是一个部分匹配成功的状态啊,那所以在有一些场景下呢,我们可能要针对这种超时分匹配成功的事件,单独要做一个检测和处理,哎,那所以说接下来我们就专门来讲一讲。
04:00
Li cp当中怎么样去处理超时事件,它是有专门一整套机制去处理超时事件。那在CP当中呢,最标准的处理超时事件的方法其实是就是要使用pattern process function的特殊出理啊,那其实我们知道patternth process function里边啊,在源码里面我们已经看到了,它本身上下文里边是有一个output方法的。这个方法我们说就跟处理函数底层一样啊,可以利用它来进行一个测输出流的分流操作,哎,那所以这里边我们主要输出的是什么信息呢?诶在这里边输出的就是超时数据。那怎么样可以把超时的事件全部放到这个测输出流里边,调用上下文里边的奥OOK方法把它做一个输出呢?诶,那这就涉及到CP里边专门提供了一个接口,用于捕捉超时部分的匹配事件啊,那这个事件应该叫做部分匹配事件,所以这个接口就叫做time out partial match handle,就是超时的部分匹配的来处理这些事件的这样一个接口。
05:10
那这个接口里边呢,必须要实现的一个抽象方法就叫做process time out match,诶我们之前这个pattern process方式里面必须要现的方法是process match,那现在呢,是process time out match,它本质上其实是一个部分匹配好的数据,现在同样是保存在这个map里边,我们可以把它提取出来做一个转换处理。对于这个方法而言呢,它除了这个map之外,还有第二个参数ctx啊,那这个ctx上下文其实就是pattern process function里边的上下文,哎,所以当前的这种方式啊,使用测殊物流进行超时部分数据的一个处理转换,就只能在pattern process function里边才能够使用。那对应的超时数据呢?诶,我们可以从这个map里边去进行一个提取,这里需要注意,我们提取出来的必须是已经到达的,已经捕获到的匹配上的那些数据,哎,如果说他是还没等到的那些数据的话,当然就拿不到对应的值了,捕获到的就是空了,所以这里边呢,我们需要另外去定义一个输出标签,然后接下来把我们检测到的部分匹配的事件就可以进行单独的处理转换。
06:28
这是flink官方给我们推荐的一种用法啊,那另外呢,其实在老版本的实现里边还有一种比较简化的啊,方便的一种调用形式,那就是直接使用一个叫做pattern timeout function的这样一个接口。因为在早期版本当中,我们说flink1.8之前并没有引入pattern process function,那那个时候我们怎么样处理超时事件呢?诶,那就是基于pattern stream去调用select或者Fla select方法的时候,可以多传入一个参数啊,这个参数就是一个pattern time out的方式,这个在源码里边我们也可以看到,非常的清楚啊,比如说这里的set方法,我们可以看到它就可以传入一个pattern t out的方式,它的类型就是pattern time out的方式,当然了在前面我们也可以直接把对应的oututook tag也做一个传入啊,那这样的话就是超时数据输出到的那个测输出流的标签也作为参数统一传入了啊,那对应的这个Python timeout的方式呢,里边必须要实现的一个方法叫做tout。啊,那这个timeout就看的很明显啊,也是有一个map保存的,就是我们当前已经匹配到的部分匹配的那些事件,另外还有一个长整型的时间戳。
07:44
哎,那其实就是目前超时的那个时间点,那最终得到的数据啊,那要转换成out类型的输出数据,然后做一个输出就可以了。那如果在代码当中具体去使用的话,那整个的流程就应该是我们在外边先去定义一个测输出流的标签啊,那主要就是用来标识当前的超时,测出出流的,然后接下来呢,哎,那我们首先去定义一个点select的方法,里边传入当前的标签,后边呢,还需要传入一个pattern tout方式去指定到底怎么样去处理超时的部分匹配的事件啊,那这里边我们要实现的就是胎帽的方法,检测到的都是部分匹配的那个超时事件。
08:30
然后另外呢,后面还跟着一个参数,这就是正常匹配事件的处理,传入的还是一个pattern select方式啊,所以跟之前我们这个select方法调用的相比,就多传入了两个参数,一个是当前超时的部分匹配事件对应的那个测试数流的标签,另外还有一个就是处理部分匹配的超时事件的对应的这个pattern timeout方式。那经过这个处理之后,当然了就是得到的结果result stream,它里边的数据呢,肯定是针对正常匹配事件处理转换得到的data啊,那如果说我们想要补货之前对于超时事件的处理的话,诶,那当然就要get set output,把对应的标签传进去,这样的话就可以得到超时事件。
09:20
这就是对于超时事件的在Li cp当中两种不同的处理方法。目前官方比较推荐的是第一种方式,我们直接使用pattern process function的测试数理实现一个timeed out partial match handle这样的一个接口。
我来说两句