00:00
接下来的核心任务就是实现这个自定义的K的process function,因为这个是做过KBY之后的,对吧,我们如果要在里边定义state的话,也应该是k state,以当前的这个order ID为为准嘛,做这样的一个操作啊,所以接下来我把这个copy一下public static class,好,那么当前既然是kid process function,那应该是extend,对吧,因为它本身是负函数嘛,Key的process function里边的类型。KIO,那当前的K我们是用那个方法引用做的,所以这里边直接就是长整型的这个al ID对吧?然后接下来是输入输出,那当然就是输入all event per类型输出是不是那个result啊,我们定义的这个比较明显啊,所以直接可以写出来。Result。
01:00
这个类型写完了之后,大家要把这个all object的这个输出类型是不是改成all result呀,All the result,这样的话类型就匹配了,接下来在里边我们其实想要,呃,大家想我这里边需要需要不需要保存的状态呢?来了一个事件之后,哎,那当前来了一个事件之后,我们当前事件有可能是create,有可能是pay对吧?诶,那当然我就是判断它的那个事件类型就可以了,但是之前到底有没有来过,比方说我现在来了一个配,之前有没有已经来过create,这个是需要是不是要保存状态我才能知道啊,因为我当前数据并不知道之前来过什么嘛,哎,所以这个就保存成状态不就完了吗?所以这个非常简单,我们这里边定义状态保存就是。之前订单是否已经来过create或者pay的事件,那所以这里边我们可以定义两个状态,这两个状态是不是都用value state就可以表示啊,而且就是来没来嘛,是不是就是一个布尔类型就可以啊,Bully,所以is paid,对吧,这是一个状态,我叫做is paid state,另外还有一个叫value state,布尔类型bullying,然后是is created state,把这个声明出来啊,那当然了,这里边我们需要去在open生命周期里边用运行时上下文去做一个处理,对吧?呃,Is paid state等于get runtime contacts,然后get state new,一个value state script里边给一个名字is paid,然后是布尔类型的class,大家知道这个默认应该是。
02:54
肯定是没有,没有支付过对吧?啊,肯定是false啊,默认是一个false,诶这个其实我直接可以copy一下就好了,整个流程差不多,接下来是不是应该还有一个is created state,那接下来同样也是,呃,创建一个描述器布尔类型对吧?注意这里边的这个名称是不是必须改变啊,对吧,Is created啊,如果要是同样的名称,大家想那flink是不是就错乱了,他不知道你到到底是哪一个对吧,到底哪一个状态啊,所以这个名称必须不一样,然后后面也是呃,沃尔类型,接接下来当前的这个初始值是false,就是没有来过对吧,什么都没有来过。
03:36
那呃,我们的主体处理流程应该是什么呢?其实非常简单,就是我来了一个当前来了一个create,那大家想正常来讲应该是create先来对吧,那么他先来了之后,如果要是没有配的话,我是不是就应该把它啊,首先把它这个状态先保存更新状态对吧,然后是不是把。当前应该要注册一个15分钟之后的定时器啊,哎,所以是有这样一个操作的,然后另外就是说,呃,大家想如果说接下来那个如果要对应的那个配来了之后,是不是我就直接已经匹配到,既然有create来了,配又来了,判断一下时间戳在15分钟之内,那是不是直接输出输出这个正常匹配就完事了,对吧,实时输出嘛,哎,那如这个时候是不是我们定时器还在啊,那家想到是不是应该有一个删除定时器的操作,哎,所以这里边清空定时器,那是不是必须得知道之前定时器的时间戳,哎这个问题就又回来了,所以这里边是不是我们定义状态的状态的时候还应该要有一个对保存保存。
04:44
定时器时间戳对吧,它应该有一个这个时间戳做一个保存,那当然这个也是一个value state里边是一个长整型,保存的是timer ts state哦,那在下边我们做这个处理的时候,这里是timer ts state,同样的get runtime contact get state里边是一个value state script是长整型,里边定义它叫timer ts长整型class。
05:17
点plus对吧?当然这个我们就不需要再去给初始值了,默认初始就是闹吗?没有吗?这就是前面我们对于状态的定义,接下来必须要实现的方法,Process element,每一个数据来的时候都会调到这里来,那我们在这里做这个判断之前,首先先拿到当前的状态,先获取。当前状态,我干脆就把所有的状态都拿出来吧。两个布尔类型的状态is paid,那这个是is paid state.value那另外还有一个是is create,那这个就是is created state.value另外还有一个长整型的时间戳timer ts,这个是timer ts state.value对吧,先都拿出来再说。
06:11
接下来当然就是判断当前的事件类型去做if else,这个分情况讨论了,对吧?接下来判断当前事件类型,首先if,大家想首先我要判断的是不是诶,是否是create对吧?所以这里面create,如果equals当前value点我们那个叫even type,对吧。如果当前是这个,这是第一种情况啊,如果来的是create,那接下来要干什么?来的是create,是不是我还得判断之前到底有没有支付过啊,啊,因为确实有可能已经支付过了,大家想如果要是我的对创建订单和支付事件离得很近,而且又有乱序的话,是不是就真的有可能支付先来啊,那所以我们得把这个乱序都搞定对吧?都得处理清楚,所以如果来的是create的话,要判断,要判断是否支付过,所以接下来又一重衣服对吧?啊,接下来这个是做判断,当前是是不是就是一字配的呀,呃,状状态已经提出来了嘛,直接判断就完事了,那接下来是1.1。
07:30
这种情形是不是如果已经正常支付,诶,那大家想这里边我们是不是如果已经正常支付的话,现在既然是乱序嘛,它应该是离得很近对不对?诶当前这个顺序是离得很近的,所以这里边我其实可以直接它肯定这个不会超过15分钟对吧?哎,那肯定就直接就输出这个正常匹配的结果就可以了,所以我们呃输出正常。
08:01
正常匹配结果。所以这里边的正常匹配结果在主流里边,那当然就是out.collect对,里边来一个all result里面的包装当前的order ID对不对?Get order ID后面来一个当前的状态对吧?这个是paid的对吧?啊,这个或者我们可以多写一点啊,Paid的successfully。Successfully对吧,这是当前的一个最正常的一个状态啊,啊,那当然这里边既然你已经正常支付输出结果了,那大家想我是不是该清的状态应该要清掉啊,那要不然你永远不清状态的话,后边我们是不是状态会越堆越多,哎,所以这里边还应该有一个清空状态,另外还有什么是不是删除定时器啊,对吧,这个操作我们就在这儿去做了啊,那清空状态就是前面我们定义的啊这个意意思。
09:03
Created state clear,然后is paid state clear,其实其实大家知道这个created state正常来讲应该没有对吧?哎,但是这里边就是为了就是大家这个代码简单一点,你不用考虑那么多了,是吧?呃,我就也不管他到底之前来过没来没来过啊,有没有值直接清空,这是最最安全的对吧?啊,最靠谱的方式啊,那另外我们还有一个那个timer state是不是也得做一个清空啊,所以我先做一个删除。呃呃,或者这个其实无所谓,因为大家想前面我是不是这个已经保存到当前那个一个变量里了,叫做这个timer ts里了,哎,所以那现在你如果这个直接清空,其实也是没关系的啊,Timer ts直接清空对吧,然后下边我再做这个timer service的delete even timer timer对吧?啊做这样一个删除,那这里面大家要注意,就是有同学可能想,那当前这个到底有没有定时器呢?你没有的话,这里面直接删不是会报那个空指针异常吗?
10:05
诶,所以这里面就涉及到一个问题,就是这里如果要是之前已经支付过的话,那会有一个问题,就是配事件先来了之后,他要不要也要注册一个定时器。因为这里面有一个问题,就是说配来了之后,它是不是要等对应的那个create呀。所以这里就有一个问题,你是无限等下去吗?那假如说出现这种当然是小概率啊,就是出现这种非常非常小概率的事件,我们那个create事件就丢了对吧?日志里边就没收集到,那你想当前是不是相当于我这个配来了之后就会无限等下去啊,那这个状态永远不清对吧?哎,那这个显然就不合适了,所以我们想他到时候来了之后也要定义一个定时器对吧?哎,那后面我们再说啊,处理那个配的时候再说,所以当前我们呢,就是只要两个都匹配到那之前就是一定都有定时器的,就是只来一个的时候,我们都要去注册一个定时器,这样来第二个的时候是不是肯定有定时器啊啊,这个就直接删就没问题了,对吧?这个我们是按照这个if else逻辑判断把它搞定的,好,这是最最基础的这个情况,呃,If paid的啊,那然后接下来是不是else啊else。
11:22
现在是1.2的情况,呃,那如果要是没有支付过,那怎么办呢?这其实最正常的情况对吧,前面这个其实还是那个乱续先,那个配支付的时间来了,我们现在如果要是支付没来,Create先来,这是最正常的,那是不是注册15分钟定时器等待啊,对吧?所以注册15分钟后的定时器开始等待。呃,支付时间,所以这里边我们要去把这个这个定时器时间戳我们先单独列出来啊,因为后面还要保存对吧?啊,所以当前我们应该用的就是是不是以当前value的time Sam以它为准,再加上15分钟,其实大家知道就是15乘以60,是不是900秒啊,然后得到的结果这是秒,我们定义的定时定时器时间戳应该是一个毫秒,所以再乘以1000。
12:23
那接下来ctx register time service对吧?Register even time,事件时间按照事件时间到时才去做一个出发TS传进来,那另外是不是要更新状态啊更新状态。那当前的这个timer ts state做一个update,把TS放进去,另外我们当前既然已经来了create,是不是应该把is created set update成一个处啊,诶,这就是我们这个第一种情况来的是create基本的一个处理的每一个具体的步骤啊,分情况讨论就可以了。
13:05
好,所以这里边我们第一个分支这个就搞定了对吧?然后接下来当然就是if完了else对吧?来注意这里面其实应该l if对吧?因为我们接下来是不是有可能根本就呃就是对来的就不是create,也不是pay,有可能有别的嘛,Multifi对吧?啊,就是这个做了一个修改,修改的话,修改订单是不是跟我们这个超时一点关系都没有啊对吧,我们计时还是按他创建订单的那个时间来算的啊,所以这里边我们还要单独判断一下,如果来的真的是pay的话,对吧?啊,这个pay.equals当前value.get even even type,这里边是第二种情况,如果来的是。配支付事件。那是不是接下来要。要判断是不是之前是否已经有create来过啊,对吧,因为之前我们说的有可能有乱序嘛,还有可能真的就丢了,对吧?呃,要判断呃是否有下单事件来过,那接下来又是一个if if这个也简单,是不是直接is created啊,我们已经把这个提取出来了啊,首先第一种情况。
14:21
诶,这个2.1,这是已经,呃有过下单事件。下单事件,哎,那这里边会有另外的一个要求,大家想,那既然已经有了下单事件事件,然后又有了一个配,那现在不就是正常匹配直接输出主流,这个匹配成功就完事了吗?还没那么简单,因为大家想当前是不是也有可能他的时间戳已经超过15分钟了呀,诶当然这个比较极端啊,就是如果出现这种情况,那是什么呢?有同学说那你出现它已经这个超过15分钟了,为什么定时器没触发呢?
15:01
因为大家想我们定时器触发的要求是不是必须得water mark到达这个15分钟之后的那个时间点啊,而当前我是不是有可能有乱序数据,就是我当前比方说啊,我定义了一个900秒之后的那个,呃,比方说我我就是零零秒开始注册对吧,然后注册了一个900秒的那个定时器,那大家想现在我的watermark是不是有可能只到了八百九十九十八秒,对吧,但是我的乱序数据乱序程度比较大,我直接来了一个902秒的一个支付事件,是不是也有可能啊。当前时间是不是还没到900秒,没触发定时器对吧,但是不是已经来了一个配啊,它的时间戳是不是已经超过900了,所以它其实应该是一个超时事件对吧?啊,所以当然这个比较极端啊,啊所以这里边我们还是把这个具体场景再控制的细一点吧,我们把这个也检测到啊,所以要继续判断。判断呃,当前的这个时间戳对吧,时间呃支付。
16:08
支付的时间戳是否超过15分钟,哎,那所以这里边我们其实就是继续if,这里边要判断的是不是就是value.get time sum对吧,当前我我这个事件支付这个事件的时间戳,哎,然后乘以1000,这就是那个毫秒数,我要判断什么呢?诶,是不是就是是否在之前我的这个timer ts之内啊。大家想time ts是不是就是我注册的那个定时期要出发的时间戳,也就是15分钟之后的那个时间点,对吧?应该的超时时间如果小于,这是说明什么?这是2.1.1对吧,这是什么?这是不是相当于没有超时啊,对吧。就是在,呃,15分钟内没有超时,是不是正常匹配啊,诶所以接下来我们来做一个out点,你有一个order result里边就是当前value.get order ID对吧,然后输出一个结果,这个结果跟前面其实是一样的,还是成功支付对吧,完全一样。
17:28
然后大家想到哦,这里边我如果要做这个处理的话,那就应该接下来是不是也是清空状态啊,对吧,所有的这个定时器和状态直接清空放在这儿好,然后接下来else,那如果说当前并没有做过这个,呃,就是当前已经大于等于超过我当前定注册的那个定时器了,那怎么办呢?对,大家想2.1.2对吧,那当前是已经超时,那是不是输出测输出流报警对吧?诶那所以现在就不是out.click c呃,out.click了,而是Ctx.output还记得特殊流对吧?要传一个特殊流标签,我们那个叫order the timeout tag,然后另外当前的这个数据,我再去你一个order result对吧,Order result里边当前的。
18:28
呃,这个value.order ID把这个get到,然后接下来,接下来是不是给一个,正常来讲我们直接给那个timeout,但是这个好像又有点区别,因为这个应该叫是支付了,但是超时了,对吧?呃,它还不是那个就一直没支付的那种状态,所以我把它叫做paid but already timeout就给一个,给一个特殊的具体判断,对吧,然后大家想接下来是不是还是要有这么一套。既然你已经输出了嘛,那是不是还是状态都清空,然后定时器删除啊,哎,所以大家想那没必要嘛,IFL4里边都有这样一个逻辑,那是不是我接下来直接在外边不管怎么样清空状态就完事了呀,对吧,就当前既然你已经检测到有create有配都已经处理成功了,那就直接清空状态完事对吧?啊,所以这里边就是统一清空状态。
19:25
清空状态,这就是这个处理流程啊,稍微有一点复杂,我们主要就是考虑到了一些边边角角的这种极端的情况啊,所以这里边控制的就更加精细了,那这个是。2.1对吧,呃,If里边的这个2.1下属的这个情况,那接下来是不是还有对应的else啊,Else里边现在就是2.2,那大家知道2.2的话,2.1是已经有过create对吧,那2.2就是没有没有,呃,这个下单事件,那是不是相当于现在肯定是一个乱序啊,对吧,乱序。
20:06
啊,那么接下来我们应该干什么,是不是要对也是注册一个一个定时器,是不是要等待下单事件到来啊,对吧,如果这里边等到的话,那大家想下一个下单事件来的时候,状态是不是就进入到这儿来了,就是create,然后一配的这种情况了,直接就输出了,对不对?哎,那如果要是等不到的话,那他想等不到的话是不是。我到时候也要触发一个定时事件输出,说create没来啊,对吧啊,就是可能出现了一些异常对吧?啊,那所以这里边就会有一个有一个想法,就是我到底注册什么时候的定时器呢。Ctx对吧?CTx.timer service register even time timer注册一个什么时间呢?诶那大家想我是不是直接首先还是基于当前的这个时间戳对吧?那基于这个时间戳,有同学可能就想是不是我再加上一个我定义的那个最大乱序时间就可以了呢?
21:10
其实没必要,大家想一下。因为怎么样呢?假如说比方说我现在啊,八点钟来了一个支付时现,你说我当前就是那个create应该在几点钟来,它的时间说应该在几点钟来,是不是他应该肯定就在八点钟之前要来啊,所以我这里边还需要在比方说八点再加两秒再加三秒吗?不要我怎么样,直接就给八点乘以1000准准的那个毫秒数放在这儿注册一个定时器,那它触发的时候就是什么呢?是不是要等到water mark涨到八点的时候才出发啊,是不是就相当于已经把watermark的那个延迟时间都等到了,哎,所以接下来是不是就是八点之前的数据都应该到齐了,那这个时候你如果create还没来,是不是我就有理由相信你真的就丢了呀?所以大家看这就是这种用法啊,我直接利用了watermark的那个延迟,这里面直接注册一个当前时间戳的定时器。
22:07
诶,那大家注意当前时间,戳这个就代表马上要要出发吗。不一定,因为water有延迟对吧,如果这里是乱序数据的话,它的这个时间戳是不是应该比当前的watermark要是不是要大呀。大家想就是我当前这个是八点的数据,其实它是乱序数据,那是不是应该沃马比他要滞后,哎,所以是不是当前可能才在07:5519分对吧?诶,所以这样的话我就可以等待了。啊,那同样我要更新状态对吧?更新状态这里边我们首先是timer ts state做一个update,那这里边我就没有保存那个TS啊,直接把这个再复制一下放过来就行了,另外现在既然是已经支付事件来了,那是不是is paid的这个state要做一个更新,更新成处啊,啊这就是我们完整的一个处理逻辑,大家看到就是这种各种if else对吧?啊,那这个处理完成之后,我们看一下这个是这个是process element,后面还应该有一个是不是定时机触发的时候有一个on timer啊,那这里边有两种不同的情况,大家想我前面是不是有两种情况都会注册定时器,就是如果配来了create没来,我要注册一个等create,那如果create来了配没来,是不是也要注册一个定时器等配啊,那当当前到底是谁没来触发那个定时器呢?诶这是不是要判断了,对吧,定时器触发。
23:35
定时器出发是不是说明一定有一个事件没来啊,对吧,诶,那所以接下来我们其实就是要要看谁没来啊,大家想如有没有可能都没来。都没来就没有定时器,对不对,那有没有可能都来了还触发了。不可能,是不是如果都来了的话,我们一定要删啊,哎,所以这个逻辑我们已经限制死了,对吧,只要他触发一定是有一个没来,那所以我们就看大家想到是不是,就看谁来了谁没来就行了呀,哎,所以这都在我们那个状态里边吧,我就直接判断吧,如果是配事件啊,它的这个value配事线如果来了的话,大家想如果pay来了,是不是说明create没来啊,对吧,Create没来,那接下来怎么办?诶我们就直接做一个报警这个,当然这个报警稍微有点ctx output对吧,测殊出流报警这个稍微有点特殊,就是因为我们还是给一个这个tout啊,但实际实际上它它不是订单的那个支付tout,它是那个。
24:43
创建下单的那个事件没来,我们不等了,对吧,Time out了,所以这个稍微特殊一点,我们new一个order result的时候还是单独做一个说明,当天还是奥特ID怎么取呢?啊,现在没有那个value了,但是对有上下文啊,当前是不是current key,就是order ID,然后接下来给一个说明,当前就是paid but啊,没有找到not found created对吧,Created log啊,这是我们说一下没有等到它的那个created事件,那另外这里的else大家想是不是就相当于是对如果pay没来,那是不是就是直接这是真正的那个支付超时啊,诶,所以这是我们真正想要找的那个啊,15分钟他没来,还是order timeout放在这儿,你有一个order result里边cx get current k,给一个timeout,这是我们最终数出的结果,对吧?啊,当然了,最后是不是还要再做一个。
25:47
状态的清理啊,既然已经结束了嘛,所以最终清空状态啊,我把这个上面的直接copy过来做一个清空定时器,就不要删了,因为已经触发了,对吧,触发就相当于闹钟已经响了嘛,我们直接把前面的状态都清空就完事了,这就是一个完整的处理流程啊,大家可以运行一下,看一看这个效果怎么样。
26:15
看一下这个执行的结果啊,这个大家会发现代码是比capp好像复杂了一些,但是它其实是不是里边可以控制到很细节的地方啊,很多具体的逻辑我们都可以做一个做一个实现啊,所以大家看到这里边它输出的就多了几个,大家首先啊,你看到这里边有一个34767,之前我们不是它其实最后是有支付的吗?诶,所以它输出的是paid but already timeout对吧,支付了,但是已经超时了,然后那个34756大家知道是本来就没有嘛,后边就没有那个支付对吧?所以直接就是正常tout啊,然后另外还有一个34768,大家看它是配的,但是没有找到create,诶大家想这个之前我们是不是就真的找不到啊,这种事件是真的匹配不到的,因为大家看它是不是只有配事件,没有create事件啊,哎,所以这个我们是检测不出来的,现在我们也可以做一个输出报警啊,就是做一些精细化的控制,用process方式可以实现。
我来说两句