00:00
我们用这个流式的数据源给大家来测一下这个代码,我们把代码里边改成ET文本流啊,起一个这个端口是7777,所以这里边NC-LK提起来,然后这边运行一下当前的代码,接下来我们还是一条数据一条数据去做一个输入。首先哎,这里边34729这条订单,842的时候输入一条,这里边没有任何反应,这个是正常的对吧?啊,因为当前你这个就没有任何的想要去触发的操作嘛,然后呃,843的时候再来一个84730,现在我们创建了两个订单,然后接下来呢,诶大家看这个84729就来了一个支付的事件,对吧,直接配了一个看一眼啊,大家看这里面没有任何的输出,为什么没有任何输出呢?难道不是说我这里边来了一个这个支付,这应该就已经匹配上了吗。这里大家要注意一下啊,这就是我们之前给大家提到的CP底层是怎么样去处理乱序数据的呢?其实还是要结合water的这个机制,要做一个延迟的,对吧,你像现在我们有没有water呢?
01:17
我们现在有没有auto mark呀?诶有同学说没有,那大家看一下之前我们这个既然是事件时间语义,怎么会没有auto mark呢?有的,我们现在是升序数据对吧?哎,只是没有专门去定义wateralmark的那个延迟时间而已,但大家知道当前的watermark延迟是多少多少一毫秒嘛,所以当前你如果输入这个,呃,844这条数据的时候,当前的watermark是多少啊,844再减去一毫秒对不对?哎,那我们这个CP底层它去处理数据的时候是怎么做的呢?它是要按照water mark,然后去处理对应的数据,也就是说什么842这个数据来了之候,不是马上去处理的,先把它缓存起来,是等到water mark涨到842的时候,到达842的时候,接下来我再把它拿出来去做判断,哎,然后去看,诶是不是匹配上了我们这个模式序列里边的那个create呢?啊对吧,匹配上之后。
02:17
当前这个,呃,当前我们这一组这个事件序列里边create那个map里边是不是就把把当前这个create事件存进去了啊,然后接下来我继续去匹配后面的对吧?每一个事件都是要W涨到当前的这个时间点的时候才会去做处理,在这之前它都是缓存起来的,所以现在大家发现844这个数据我们输入是输入进来了,但其实它还不被做处理,对吧?也就是说当前我是不是这个34729还只有create没有配啊,是不是类似于这样一个状态对吧?啊,所以接下来大家看到啊,我给一个这个334730的一个845来了一个它的一个配啊,大家看一下这个效果是什么。
03:02
它输出了一个34729的成功支付的结果,为什么会出现这个结果呢?本来我输出的是这输入的是这个730的,呃,支付的这个数据啊,为什么输出的是729成功支付了呢?关键在于现在wal改变了wal ma是不是变成呃845再减一毫秒了,那是不是844的数据可以处理了,哎,所以接下来我发发现之前我匹配的这一组数据是不是34729有create有配,是不是成功匹配啊,成功匹配之后在代码里边,那就要调用我们定义的那个Python s select方式,是不是包装输出这样一个order result,呃,Paid successfully这样一个输出结果啊,所以现在大家看到它实时输出是在这儿去输出的啊,这里面有一毫秒的延迟,所以我们看起来好像是得到等到下一秒的数据来了之后,当天才才输出一样啊,事实上是什么呢?你假如说这个时间戳能精确一点的话,你只要。
04:03
过上一毫秒之后,就可以输出之前的这个结果了,对吧?延迟其实只有一毫秒啊,然后接下来大家会想到那这个34730的这个成功支付,大家看这已经成功支付了,对吧?它会什么时候来呢?诶,当然就是时间戳再往前推进的时候,比方说我们来个这个34731的create,大家看只是create而已,这里边输出了一个34730的成功支付,对吧?啊,这个大家就是只要这么试一试,其实就知道到底是怎么回事了啊,那当然如果说我们想测它的这个就是超时的话,那当然就是这个时间戳要不停的增大,然后water map要不停的推移,对吧?我们来看一下当前这个731,如果要超时的话,0846它要到什么时间点就超时了,应该得到加900对吧,是不是1746啊,哎,所以我们直接往后看啊。呃,这个好像没那么大的啊,我直接随便给一个吧,我们随便找一个这个create啊,34767,然后我改一下它的这个时间戳。
05:08
这34767啊,然后这里边我来一个1746对吧,我直接到这儿,但是大家看到这里边没有输出结果啊哎对大家想到是不是还是因为我们当前的这个时间戳,诶是要减一毫秒才是water b对吧,现在时间没到1746呢,那当然他这个没超时呢,没到点呢嘛,诶那这里边我来一个还是啊来一个1747,再往后推移一下,现在大家看是不是超时报警啊,诶报警它的这个时间是什么点呢?还是1746对吧,它真正那个超时的这个,呃时间点要的是1746达到这个点去去呃就去做一个报警,所以这里边只要我们超过啊呃,这个幺达到那个,呃1746加一毫秒的那个数据,如果进来的话,那么watermark就达到了1746啊,那就相当于这里边就可以触发这个我们报警的那个信息了啊,所以这个其实就是大家能够看到的流式数据输入的时候真实的一个状。
06:08
态对吧,所以是什么呢?就是数据,如果说它正常匹配上的话,那其实是非常实时的啊,就是那个配置事件来了之后,马上就输出结果,相当于马上对吧,我们这里面一毫秒延迟嘛,那如果说匹配不上的话,超时的话,那是什么呢?等到它定义的那个15分钟之后,900秒之后的那个定时器出发的时候,那个点马上输出超时结果,对吧?所以这里面的这个延迟都非常非常的小,是实时的输出了一个最终的结果。
我来说两句