00:00
我们已经了解了,在CP当中,我们可以去处理检测到的匹配的复杂事件,也可以呢去处理部分匹配的超时事件啊,所以对于CP而言啊,它能够处理的场景是非常丰富的,那这里我们会发现所谓的复杂事件,关键就在于要去定义一组事件先后发生的顺序,哎,那在这个过程当中呢,如果说我们当前是事件时间语义的话,那有可能事件到来的顺序跟它本身发生的时间可能是不一样的,这就是我们所说的乱序数据,那乱序数据到底应该怎么处理呢?诶,我们可能会想到,对于当前这个代码里边,之前我们测试的过程当中都是按照时间顺序啊依次升序排列的,所以在这个过程当中能够正确的处理数据肯定是没有问题的,那假如说我们这里面出现了乱序数据,又会怎么样呢?诶,那比如说啊,我们之前的这一个logging file里边,这里边我们这个SUCCESS6秒钟的这一个登录成功,假如说我们换一下位置,把它直接换到八秒钟后。
01:07
那如果是这样去做操作的话,是不是就代表着前面U2就会已经出现连续三次的登录失败,检测就会报警了呢?呃,其实不会的啊,在CP底层,在事件时间语义下,它同样是通过水位线来控制我们当前时间的进展的,所以当某一个事件到来的时候,它并不是直接就去做检测匹配的处理,而是先放到一个缓冲区里面,先把它缓存起来。那所有在缓冲区里边的数据呢,会按照他们的时间戳从小到大进行一个排序,那当水位线发生进展之后呢,就会把时间戳小于水位线的所有事件全部拿出来,然后进行检测匹配,那至于当前水位线之后的那些数据呢?诶,那这些数据还是缓存着的,他们并不会被马上处理。
02:00
所以在这个过程当中,我们如果要出现乱序的话,那怎么样就能正确处理呢?当然就是设置一个水位线的延迟就可以处理这种场景啊,所以在这种场景下,假如说啊,我们这个八秒之后又来了六秒的数据,那这里就可以。在提取时间戳和分配水位线的时候,不要去直接调用升序的时间戳的提取策略,而是signmp and watermarks,然后接下来里边去调用一个water mark strategy。啊,那我们去for bounded out of orderness这样一个处理乱序数据的方法啊,那里边呢,我们要去传入一个duration,这里边可以直接指定,比如我们这里可以看到啊,八秒和六秒两个数据,它的乱序程度是两秒钟,哎,那我们这里涵盖它的话,可以直接给一个三秒钟的延迟时间,这样就肯定没有问题了。那定义这个water mark strategy的时候呢,我们还需要必须去with time step a sign啊,那这里边我们要指定怎么样去提取当前的时间戳,哎,那所以这里边可以你有一个。
03:08
Lizable time stamp aign啊,那里边数据的类型当然是logging event里边必须要实现一个extract time sum这样一个方法啊,那提取time sum字段,那这样我们就是从数据的time sum字段里边去提取时间戳,然后按照三秒钟的延迟去生成水位线。这样的话,我们接下来就可以正确的处理乱序数据,然后进行对应时间的判断啊,那所以接下来呢,我们还可以在后边给定一个,首先这个顺序我们就明确了,然后在后边我们进行这个模式定义的时候呢,哎,当然也可以有一个限制啊,比方说我们可以直接VC多长时间范围内啊,那当前我们可以定义一个time。这个time呢,要引入的是flink streaming API window time.time那这里边比方说我们就是在十秒钟之内啊,那当然了,这里所有的这个登录失败和成功的数据都是在十秒钟之内发生的啊,接下来我们可以运行一下。
04:11
看看对于这个USER2,它的三次登录失败是不是会正常的检测出来,那我们看还是检测不出来,因为当前的这个success登录成功六秒钟的事件,其实还是插入到八秒和七秒之前的,哎,所以检测的结果依然是正确。这就是关于我们在CP当中跟时间相关的数据啊,如果说出现迟到数据,出现乱序数据的话,怎么样去处理,基本的一个想法就是设置一个水位线的延迟。那自然我就想到了,之前说过水位线延迟时间的设置肯定不能设太大,哎,那它其实是不能保证我们所有的这个乱序数据都能被正确处理的,那假如说有一些数据,它的乱序程度就超过了我们这里设置的这个最大的延迟时间呢?那这个时候这个数据会怎么样呢?那当然这个数据就没有办法被正确处理了,相当于迟到数据就会被丢掉。
05:10
我们自然就有一个考量,跟窗口的操作一样,至少这个数据不要丢啊,能不能我们对于这个水位线已经没有办法正确处理的这些迟到数据,还能让他输出到另外的一个地方,至少能够获取到对应的迟到数据不让他丢掉呢?诶,那自然是有这样的方法的,那就是另外去创建一个测输出流,把迟到数据输出到测输出流里边去另行处理啊,那所以在代码当中怎么样去做这样的一个测试出流的操作呢?呃,其实也非常简单,之前我们在pattern stream。本身能够调用的方法里边应该也能看到有一个比较特殊的方法,就叫做side output later data,所以这个方法其实跟窗口里边可选API的那个方法是一模一样,它也是可以直接传入一个data data的标签啊,对应的这个测试出流的标签,接下来呢,就可以把我们当前水位线延迟没有办法正常处理的那些迟到数据直接放入到对应的测数出流当中去,那接下来得到的这个data swim啊,我们经过转换处理之后得到这个主流,再去调用一个get side out的方法,传入对应的标签,就可以捕获到相应的迟到数据了。
06:28
所以本质上来讲,在cep当中处理迟到数据,它用的方法跟窗口里边用的方法其实是类似的,只不过呢,窗口里边我们可以使用watermark延迟,然后呢,还可以基于窗口去指定一个允许延迟的时间啊,那最后呢,再来把迟到数据放入测试流,我们现在没有窗口允许延迟的时间了,只是用到了水位线延迟以及测试出流两种方式。这就是CP当中对于迟到数据的处理。
我来说两句