00:00
对于我们这个订单失效的检测啊,那上节课给大家用cep做了一个完整实现,那接下来大家可以思考一下,如果说我们不用cep。如果我们用这个前面讲到的process function data API直接可以实现吗?其实是可以的啊,大家想之前我们讲那个,呃,连续登录失败的时候,我们其实就有一个思路,就是按照事件出发,来一个事件就去判断一次,对吧,判断之前到底是什么状态,然后把这个做一个更改就完事了,那后面我们发现它主要的一个问题在于什么呢?主要就是在于我们要判断这个两秒钟之内,呃,这个这个判断顺序的时候,它是要严格警铃对吧,就紧接着就要发生,所以如果出现乱序数据的时候,是不是相当于我们这里边的这个逻辑就要判断很多,就有很多额外的逻辑判断啊,啊就要就要复杂很多,那现在这个场景呢?我们是是不是create这个下单行为来了之后,只要后边有支付有配就可以,中间你就是就算是有乱序没关系,对不对,就算是中间有一些这个modify有一些修改的这个数据对我们最终的影响是,呃,基本上是没有影响的,所以我们这里边其实。
01:17
即使是判断乱序,我是不是也只要判断当前这个支付和下单,Create和配,他们有可能有先后顺序对不对,有可能这个先来,有可能呃,就是呃,这个后来,对吧?啊,这个顺序我们只要搞清楚,所以只要是一个if的一个逻辑判断,梳理清了,其实这个问题就不大了啊,所以接下来呢,我们就用一个process function啊,底层API对这个需求做一个实现,那我们还是在order pay detect下边创建一个。新的class,当前这个我们就叫做all timeout还是啊,做这个超时的检测,呃,这个我们要检测的是就是没有CP的一种处理方式,对吧?或者说我们是用这个process方式的一种处理方式,我就直接叫做without cp吧,大家知道是用了什么方法就可以,然后前面的处理流程是不是跟前面这个应该是几乎一样啊对吧?这个创建执行环境,然后转换成读取数据,转换成骤类型分配时间窗和watermark,呃,后面这个定义pattern当然就不必要了啊,我们这里边就是直接到这。
02:26
把这个先创建出来。好,那后边还是啊,把这个画括号定义出来,最后是应该有一步Env.execute执行的这个过程,当前的这个job我们可以叫做呃,就是。All the time out。Detect without。Cep找,那接下来这个流程跟之前就会有所不同了,对吧?大家想之前我们是按照那个CP的步骤,先定义patternon,然后将patternon应用到这个流上去,然后接下来是select方法去提取,对吧?如果有超时事件的话,你再定义一个测殊物流标签,有一个patternon time out function去做这个超时事件的提取,那我们现在就不是那样的一步一步了,现在是不是统一就一步啊,大家想想是不是就就一步就直接搞定了,因为我们现在是不是对应的应该要有一个,呃,对应的这个逻辑判断里边可能要注册状态对吧,定义状态,另外是不是可能还需要有定时器啊。
03:32
因为你既然有15分钟这个判断它失效嘛,那是不是我们就可以非常简单的来了一个create,注册一个15分钟之后的定时器,到点的话,是不是就相当于超时了呀,如果到点那个配还没来,这就是超时,如果要是说配来了的话,成功支付对吧?逻辑其实非常简单,所以接下来既然又用到了状态,又用到了定时器,那是不是就是一个process function啊,诶直接用底层API搞定,这个过程就是自定义。
04:03
自定义处理函数,然后呃呃,我们这里边可以想到就是主要就是还是要检测到当前成功支付的这些事件,另外还有就是没有正常匹配,匹配上超时的那些事件,对吧?诶,那所以这里面我们是不是应该相当于要再做一个拆分啊,我们直接可以主流输出的是成功支付的,那测输出流是不是可以输出那个超时的呀,所以他们接下来的操作就可以不一样啊,那所以这里我们定义这个主流输出正常支付订单事件,那么侧输出流输出超时报警事件。那因为后边我们单独要在那个process方式里边去做这个测殊输出,那我们定义那个测殊流标签的时候,大家可能想到了,我可以直接在外边,就在这个类里边main方法外边是不是直接全局定义一个测殊标签啊,对吧?啊,我们定义超时事件的侧输出流标签,直接在这定义出来,我把它定义成private private对吧?呃,Static定义出来out tag。
05:30
当前的类型应该还是all the result,跟我们前面那个应该一样,对吧?啊,然后当前我们定义一个all the timeout tag,直接new一个order,呃,你一个output tag,那么接下来里边给一个当前的名称,这个叫order。Timeout。后边画括号把它创建出来对吧,这就是这个,呃,在外边先把这个测殊流标签定定义出来,后边我们就直接可以拿来用了嘛,这个就简单很多啊啊,所以接下来我们其实要做的就是基于之前的order stream,首先我们是不是还是要按照order ID做分组啊,因为你检测那个create配事件肯定还是针对当前order order ID的嘛,同一个order才去做这个判断,所以接下来就是K。
06:24
KBY,当前是order event get out ID对吧?然后接下来是一个process里边要去自己定义一个,比方说我这个叫order pay match detect对吧。我把这个创建出来。得到的结果这个可以叫做result stream啊,但其实大家知道,就是本身主流输出的其实是成功匹配的对吧,所以直接打印这里面得到的是paid normally。这样的一个流,然后接下来如果要是找他的那个超时事件的话,是不是跟我们前面一样,也是result stream,要去get set output啊,对吧,这里面把那个all timeout呃,Tag传进来,接下来直接做一个打印输出就可以了,这里是timeout。
07:17
这就是一个完整的流程,整个的架构跟前面还是非常的接近,对吧?啊,关键就是怎么样实现这样一个kid process方式。
我来说两句