00:00
到目前为止,我们已经了解了在flink c当中怎么样去定义模式,然后怎么样去做模式的检测处理,我们可以去处理正常匹配的复杂事件,也可以去处理超时的部分匹配的那些复杂事件啊,那这些过程我们都已经在具体的代码实例当中得到了应用和验证,那这里面有一个细节需要去注意,那就是当前我们在进行数据的处理过程当中,其实当前的数据是可以出现乱序情况的。诶,那这里我们会发现前面我们在做连续三次登陆失败的时候,即使把六秒钟的对应的这一个登录成功事件放到后边啊,就相当于我们是。后发生的是数据时间戳,比较大的先来,然后时间说比较小的先发生的事件呢,后到,这个也是完全可以的,并不影响最后的结果输出,那lica是怎么样做到这一点的呢?
01:07
哎,那其实我们就想到了,这里的关键就在于要使用水位线,利用水位线的延迟来处理这里的乱序数据。因为我们知道在CP当中要处理的其实就是按照一定的顺序先后发生的一系列事件,所以这里面事件的先后顺序非常的重要。啊,那对于事件的先后顺序,如果说我们现在是处理时间语义的话,那这个比较简单,那就是谁先来谁就谁就先,谁后到,谁就是在后面啊,那所以我们这里的next follow都按照这个顺序来就可以,但如果说现在是事件时间语义的话,很显然我们的先来后到应该以当前数据带着的时间戳为准。那这就有可能出现所谓的乱序情况了啊,那就是当前先到的数据呢,诶,它未必是排在前面的,有可能它排在后面。
02:08
所以整个cep的处理流程跟。前面我们提到过的flink data streamam API里的处理方式是类似的,就是利用的延迟多等待一会儿,诶那这里边提前到的那些数据怎么办呢?当然就应该先在缓冲区里边做一个缓存。所以这里我们所有的数据依次到来的时候,后边的处理流程其实本质上应该是要按照。当前的水位线mark来进行出发的,也就是说当一个事件到来的时候,它并不是直接就去处理,就去判断,诶是不是符合我们当前的第一个模式呢?是不是紧跟着符合第二个模式呢?并不是直接去做判断。
03:00
它是先放到缓冲区里面。缓冲区里边的所有数据,那就按照时间戳大小做一个排序,然后当水位线进行更新之后,比如说诶,当前,呃,我们当前的这个水位线延迟是三秒钟,那么七秒的这个数据来了之后,当前的水位线已经进展到了四秒,那么我们就把缓冲区里边所有小于四秒钟的数据提取出来,依次进行处理,进行判断,看一看他们是否能够匹配到我们的这个规则,那这样的话就保证了。整个匹配事件的顺序跟我们。所有的事件发生的时间顺序是完全一致的,那最终处理的结果就不会受到乱序数据的影响。但是呢,这个又会带来另外一个问题,就是说水位线的延迟时间,我们说它并不是严格意义上能够保证所有乱序数据一定都能够正常处理,我们只是经验性的给一个比较合理的值啊,因为我们并不知道未来的数据流里边的数据到底是什么样的嘛。
04:14
所以那如果说当前,假如说啊,当前的这个watermark延迟是三秒,假如说我们在八秒之后又来了一个四秒钟的数据,那会怎么办呢?那就没有办法了,因为相当于四秒钟的数据,在这个四秒之前啊,都已经处理完了,所以如果当前W已经超过了它的话,这个数据就会被丢掉。它所带来的那些匹配复杂事件当然也就不会被检测到。那我们就想到了,那假如说我现在不想让这个迟到的数据丢掉,那又应该怎么做呢?哎,那自然就想到了,也可以去借鉴窗口当中的做法嘛,窗口当中啊,那本身是有一个窗口可以允许一段时间的延迟,窗口可以延迟关闭,可以等待一段时间,那另外还有一个就是可以把迟到的数据放到测输出流里面去。
05:13
那我们这里可能是没有涉及到窗口的延迟触发,但是可以有特殊出流,其实在之前我们看这个API的时候,也已经看到了对应的方式。那就是基于pattern stream,我们这里边。调用这个select,除了这个之外。还有select,还有process。另外还有一个方法就是。Output late data。很显然,这样一个方法就是让我们传入一个测输出流的标签,然后把迟到的数据直接放到对应的这个测输出流里面去就可以了。哎,那所以它的用法呢,也就跟。
06:02
窗口处理延迟迟到数据的那种用法是完全一致的,我们可以看一下。当前的具体调用的方式,哎,那就是首先我们先调用CP.pattern把当前的数据流和pattern传进去,得到一个string,然后呢,我们可以定义一个测殊流的标签啊,比方说当前这个就叫做data output t。然后接下来我们就可以pattern stream,在调用select之前啊,就类似于window。我们也是可以在。调用定义这个窗口函数之前就去调这个output data啊,那这里也是在这之前调用一个output data,把标签传进来,指定把迟到数据塞到特殊出流里面去,那后边的话点select就正常处理匹配的数据了。然后最终呢?如果想拿到测试流里的迟到数据,那就是依然是result.get side output,把它获取到就可以了。
07:08
你会发现。Link cp给我们提供了完整的一整套处理复杂事件、各种情况、各种需求的一整套方法。在实际项目当中就有非常广泛的应用。
我来说两句