00:00
上节课我们已经做了一个简单的状态编程的实现,那这节课呢,再给大家在之前的基础上做一些改进,主要我们想要改进的点是什么?就是不希望它这个输出的很慢,一定要等到那个定时器触发的时候才输出结果对不对?因为开out的结果的话是必须在那里输出,那如果说是对successful的这个结果成功匹配的结果,是不是应该是来一个create和配配对匹配起来我们马上就应该输出啊啊,但是如果要大家想到,如果要处理这种情况的话,那就得处理不同的情形了,因为有乱区事件吧。呃,这里给大家我们就在这个基础上做一个简单实现吧。呃,那大家会想到下面这一部分就都不重要了,对吧,这部分我们就另外去实现别的这种方法,呃,在这个里边,如果说要去做一个改进的话,可以把我们。
01:00
之后的那个结果是不是可以区分开啊,大家想,大家想一想,是不是我可以把这个正常匹配成功的结果放到主流里边,更好的一种方式就应该是正常的放主流,然后不正常的是不是可以把它对提取出来放到一个测试主流里边啊,所以这里边我们定义一个测试出流吧,还是在上面我们在另外边直接定义测试出流标签对吧?这里边all the time out output。Tag,你有一个output tag,这里面的类型当然还是all the result对吧?这个我们就不单独定义了,大家如果想单独定义一个,比方说warning类型也可以,这里我们就直接用order result就好,给一个名字叫order timeout。啊,然后下面我们可能要去做一些调整,做什么调整呢?而这里边可能就不是说去呃,直接做这个呃,Water,呃,那个order timeout warning了,我们这里边就是什么呢,相当于是要检测所有的数据,而且最后那个成功输出的数据是不是也在里边啊,对吧?所以其实不是简单的一个warning啊,这里边我们就叫order result。
02:25
Stream基于order even stream做一个process处理,这个名字我们叫做order pay match吧,就是要各种匹配,对不对,不同的情况,Create和pay要匹配起来,然后做结果的输出,呃,后续的这个输出结果呢?那当然我们知道就要all the result直接输出,这个应该是主流,是不是应该是已经成功支付的情况啊。另外all the result可以在get,当然这个前面有错,所以说这个不能直接输出啊,我们先把它拿出来吧,Get set output里边的那个,呃,时间那个,我们的那个tag应该传前面定义好的all the time out。
03:14
Timeout output tag,最后再做一个打印输出,这里边给的是timeout的这个时间,对吧?啊,当然了,这里边我们给的这个打印是叫t Mo的,其实我们可能是把所有的异常情况都输出到测试出流,对吧?啊,这个就大家自己定义,你可以定义到不同的特殊出流。呃,最后我们执行,然后接下来要实现的,因为要共用那个timeout tag的话,我就直接在这里实现了,对吧,在里边去实现class order pay。Match,那这里边实现的还得去实现一个什么接口呢?还是K的process function,这里边KIO是不是还是类似啊?我们通过order ID做了一个key by,所以说key long类型,输入的类型是order event,输出的类型是all result,这个完全一样啊。
04:13
然后必须要实现一个process element的方法,那在这个里边我们可能需要的大家会想到需要的状态可能就稍微多一点了,对吧,那之前首先之前定义好的那个1PAY是不是还需要对吧?判断我们那个这个之前是否已经来过这个配的状态,这个应该在在外面定义啊lazy,然后接下来除了这个之外还需要什么呢?大家会想到这个this pay表示配是否来过了,另外我们是不是还需要知道create是否来过啊,啊,对应的可能还需要一个表示create是否来过的状态,还需要什么呢?诶在这个过程当中,如果我们要是说大家会想到,如果要是实时的检测到它的那个create和配已经匹配起来,我们输出匹配成功的那个信息的话,之前定义的那个定时器是不是就不要再去出发了呀?
05:11
哎,那是不是我们中间应该还要删除定时器,删除定时器是不是就应该保存定时器的时间戳啊,所以还需要保存一个定时时间戳,所以这里边其实我们需要这样三个状态,这里边我们做一个简单的一个规定和优化,什么优化呢?就是大家会想到这个定时器的时间戳,假如一开始就是没来定时器时间戳是零对吧,如果是空的时候。是不是就表示create没来啊,大家能想到对吧,所以在这种情况下,那其实我们是不是只要有一个time,这个timer的那个时间戳保存到状态就可以了,通过它也能判断到底来没来,对吧?所以这里边我们保存。定时器的时间戳为状态,所以这里边啊,当然我直接把这个复制过来,我们改一改啊,这里就不叫is paid了,这里叫,比方说我定义一个timer,呃,Timer state就叫。
06:19
当然它类型也不是布尔类型,而是long,对时间戳后边也得改,都得改对吧,Long类型,注意这里边的名字是不是也得改啊,你不能重复名字啊,这里边叫timer state。最后的类型也得改是一个了,对吧?啊,所以这个改完之后不报错没问题,接下来我们就是具体处理这个逻辑了,首先是不是先拿到当前的状态对吧,先读取状态。呃,首先我们还是定义一个e pay的,它是从EPA state里边拿出来的value,另外定义一个啊,Timer对吧。
07:06
啊,Timer时间戳TS,它是timer state里边拿出来的value啊,这是我们的第一步,然后是不是就是if做判断了,对吧,根据事件的类型进型分类判断。做不同的逻辑处理,处理逻辑啊,所以首先if if什么,哎,这里边是不是首先要判断我们来的这个type,是不是就考虑两种情况,要不是create,要不是配对不对,所以首先这里我们判断create的情况。那大家自然会想到后边是不是会有else。啊,当然这里面应该是else if对吧?Else if value.even type等于配对,这里边首先是这么两大块,哎,这里面写错了啊。
08:19
好,我们把这个先定义出来,这个整体的逻辑先定义出来,然后在具体的里边,大家会想到接下来需要做什么判断呢?如果是create事件是不是接下来是要判断是不是有pay来过啊,对吧?呃,接下来判断pay是否来过,那么这个标志其实就是我们的is paid对吧?这标志位is paid。当然这里边有if有ELSE2个逻辑处理对不对,那这里边那那大家就看到其实就是这几种情况嘛,对吧,分别if else各种判断,那如果说已经配过的话,这相当于什么呀,是不是已经匹配成功了,对吧?所以这里边就可以直接输出结果了,然后取消定定时器,然后这个清空状态是不是就完事啊,所以这里边如果如果已经。
09:26
配过,呃,那么匹配成功输出。呃,输出主流数据对吧,然后清空状态,所以这里边我们要做的,当然有同学说,诶你这里边还需要加一个逻辑,就是说你得判断它超不超时了呀,对吧,那那万一说这里边这个在这里边来来了这个配的时候他已经。就是来了这个create的时候跟配的那个时间都已经超时了呢,呃,这种情况其实比较少见,因为这种情况相当于是什么呢?你是先来了配,然后它是CREATE15分钟之后的一个时间戳先来了。
10:15
然后15分钟之前的一个create再来了,而且你还要在这个过程当中,相当于什么,我们还就是大家会想到这个相当于那个,呃,就是在这个过程当中,你这个延迟数据都得处理,对不对,你相当于你事件得处理15分钟的这个数据都得存着,都得去保留之前的状态,这个其实实际情况你如果要有这么大延迟,15分钟延迟的话,嗯,那可能就不用做实时了,对吧,干脆你就做这个呃,离线的批处理就好了,所以这种情况我们这里就不去做考虑了啊,就默认不可能说这个配的来了之后,如果来了这个先先来了配,后来了create,居然还超过了15分钟的这个时间距离啊,这个情况我们就不考虑了,大家如果要考虑的更复杂一些的话,可以加进去啊,再加上各种各样的if else,对不对?继续去细化,那这里面直接输出的话就是out.collect all result。
11:14
里边的包装当前的order ID写在这儿对不对啊,那么这里边输出是不是就是配的successfully。好,当然了,后边我们需要去清空状态,这里清空的时候先要去把type service是不是要delete呀,Delete even time timer这里边是不是把这个timer的传进去,另外还要清空is state clear,然后timer state clear,就是如果已经匹配成功的话,两个都来了的话,后续啥都不干了,对吧,定时器也别触发了,然后状态全部清空完事,我们现在这个彻底搞定,这是一种情况,这是。
12:02
呃,第一种情况对吧。这里相当于是第一种情况是,如果是create事件,然后这里是1.1,对不对,然后接下来大家会想到这里是不是1.2啊。1.2,如果没有配过,那怎么做呢?那是不是就要注册定时器,要等待了,等待。配的到来,所以这里边啊,那这个要注册定时器是什么呢?是不是得是15分钟之后的那个时间点啊,以当前value的那个事件时间,以起始为为起始点,然后15分钟之后对吧?所以这里边我们可以先定一个要存的这个时间戳,这个计算还有点长,所以我们单独写出来value点。Time乘1000对吧,然后要加上15乘以60乘以1000,对,所以接下来注册定时器time service register time timer ts对吧?另外注意是不是还得保存状态啊。
13:16
因为我们是不是已经定义了是这个定时器了,所以接下来定时器的状态要update ts对吧?把它存到状态里面去,哎,这就是这样的一个过程,来有有什么问题吗?来两个啊,对,就是咱们现在处理的状态都是一个create一个配,不要纠结这个两个create的情况了,对吧?你如果要是想要判断这种复杂的情况的话,那有同学可能说那中间如果要订单取消了呢?对吧,你如果要是出现订单,其他的一些操作呢,之后不能再配,或者说不能再create呢?啊,这里我们暂时不处理那么多复杂情况,因为大家会想到这种条件的话,这就是根据我们本身的业务逻辑定义有关了,对吧?啊,你如果要是定义有各种各样的复杂的情况,我们当然可以各种各样if else去判断,这里只是给大家把这种一个create一个配的情况给大家完整的在状态编程里边用,这个就是能够。
14:16
不理乱序的情况,把各种不同的情况都分析到,做出一个输出,所以大家不要太纠结于一些特别的细节啊,因为平常我们可能不会用这种方式去处理真正的这种事件序列的判断,对不对啊,有更简单的方式啊。好,然后接下来我们要判断就是第二种情况,那这就是如果是配先来了,对吧?啊,如果是如果是配事件,那么就去。那么继续要判断是不是判断是否来了呀,对吧?呃,是否create过,那用什么来判断create过呢?哦,这里边我们是不是可以用。
15:05
定时器对timer表示对吧?所以这里边我们判断的是是不是如果timer ts大于零的话,是不是代表之前已经来过对吧?所以这是2.1,如果有定时器说明。已经有create来过啊,所以那现在应该做什么操作呢。哎,现在我们可能就得去判断一下,既然是有create又有配,现在是不是就得判断那个时间戳了,因为是因为是create线来配,后来的话,这个就很有可能是15分钟之后的一个时间才来的,对不对啊,这个是完全有可能的,这是正常的这个时间去,呃,输出的一个结果嘛,所以我们之前不考虑这种情况,是因为乱序,我们不考虑这个乱序到15分钟以上的那种情况,那这里边你正常排序的话,当然有可能来了一个15分钟以上的一个配,那这里边就要判断一下。
16:14
继续判断。是否超过了timeout时间?哎,所以这里边要继续判断if。呃,依附什么呢?是不是timer ts,如果要是大于value点。Time当前的时间。哎,大家会想到什么?是不是这个还要再乘以1000L啊?哎,这这里大家需要注意的一点是什么呢?这里是如果已经有,呃,就是有定时器的话,说明是那个create来过对吧,那create的时间是什么呢。
17:05
Create的时间是不是就应该是当前定时器的时间啊,对吧?所以我们现在要判断的是什么。定时,哎,大家想我现在要判断的是定时器时间要要是哪个应该大,哪个应该小,我们现在要判断的是正常匹配的这种情况,对吧,定时定是定时器要大,还是还是这个value,新来的这个数据时间要大呢?时间小对是不是新来的要小,定时器要大才是正常匹配的一个情况啊,哎,有些同学说那后面我应该还要加上那个15分钟加上900。呃,900再乘以1000,要不要加。大家想一下。我其实要判断的其实是当前的这个时间,如果你要加这个的话,是不是应该是判断当前我新来的这个pay的时间和create的时间相比,是不是差别在15分钟以上,对不对?当时我们的这个time timer定义的时候是不是已经加过这个900了,而且大家会想到这里其实不是这个要加,你如果要判断也是这个要减,对不对,对吧?因为当前时间应该大嘛,当前时间大它应该减去在跟之前我们呃那个create的时间去做比较,而create时间在这个定义time的时候已经加上了,所以是不是直接比较就可以了,哎,所以大家是这样啊,就是如果说time比它大,那所以出现这种情况,就是定时器还没触发,现在来了这样的一个配,对不对,所以这里边。
18:54
对,这里面相当于又是2.1.1对吧。如果。
19:03
呃,这个定时器时间还没到。那么输出成功匹配对吧?直接输出,所以这里边直接out.C呃,然后这里边有order result,当前的K是不是还是value.order ID,哎,输出一个配的successfully。好,然后大家会想到,那如果要是说它比这个小了的话,怎么办呢。2.1.2如果。当前数据的当前其实就是当前配对吧,当前配的时间对已经超过时间,已经超时,那么对输出到测数出流,哎,所以是这样一个逻辑对吧。
20:13
呃,所以这里边我们可以去Ctx.output对吧,这里边要传入我们的tag output out type out output tag,然后给一个,哎,这里边得是那个order result对吧,包装好还是value.order ID后边跟上一个,诶这个情况应该叫什么?是不是就叫超市,诶这个应该是叫这个应该是太out了,但是这里边其实是等到了这个paid的数据,对不对啊,所以我们可以叫paid。But already。对吧,你可以有不同的这个输出的情况吧,把我们不同的这个分支都表达出来,好,然后这里边在外边大家会看到啊,呃,在在这里边我们输出了没有清理状态对不对,因为不管是输出到主流还是输出到侧输出流是不是当前都是有create有配,都已经判断结束了,最后是不是都应该把所有的状态清空了啊,所以这里边还是一样啊,前面该做的这些事情,状态全部清空。
21:23
定时器和状态全部清空,输出结束清空状态。好,这是我们这个else第二个情况里边的2.1的这个分值对不对,那当然还有2.2,还有else。那else,这这是什么情况,这是如果这里面没有定时器,说明是没有create过,对不对,说明是。是配先来了对吧,配先到了啊,那么就怎么样呢?呃,理论上我们是不是如果配先到了的话。
22:04
就相当于是那个呃,就相当于已经这个本身这个这个呃P应该是已经丢掉了呀,如果要是说事件发生的时间,就是按照顺序一个一个输入的话,那就应该是街道配的话,Create应该在他前面,如果要没收到,那就应该是已经丢掉了,对吧?啊,但是这里边我们还会有一种情况是乱续数据对不对?跟大家想想,乱续数据是不是我还应该等一等create呀,那这个怎么等呢?首先既然是配先到了,我是不是可以把那个状态先更新一下啊,对吧?这个先更新成true,然后。更新状态,另外我可以干什么呢?可以等等一下这个create,那言下之意是不是就还是要注册一个定时器啊,对吧,注册定时器等待create,那大家想一下这个这个定时器到底是注册到什么时候呢?诶CX先把这个写出来啊,C TX time service register even time timer对吧?那这里边我根据什么来注册呢?
23:24
对,有些同学已经想到了,我是不是直接可以注册一个even time timer,直接就用当前的时间时间戳去注册定时器就可以。为什么这样可以?哎,有同学说你这样注册了的话,那不是马上就应该出发了吗?那肯定就就就等不到create呀。注意,这里边我们注册的是time timer。为什么会出现?配先来create还没到呢,是不是就是因为乱续啊,所以当前的这个value even,呃,Even time里边带着这个时间戳来了之后,当前的watermark找到这里了吗?是不是还没有找到啊,所以我们注册一个这个时间点的定时器含义是什么?
24:11
就是要等一等,等到water mark涨到当前的这个时间戳表示的这个时间的时候,看看create是不是又来了,对吧,如果那个时候还没来,是不是真的create可能就丢了呀,对吧?因为create本来应该在它之前嘛,Water rock表示的含义是不是就是之前的数据应该都到了,所以那个时候如果还没到,那就表示真的没来了,所以这里边我们另外把这个timer state update update一下啊,当然这里边还是他的time乘以1000的这个数。啊,所以大家看看啊,这就是我们处理所有的逻辑分支,用这种方式就把它一个一个具体的逻辑都搞定了。哎,另外还没完。还有一个是不是timer定时器触发这个操作没搞定啊,这里面大家其实会发现,其实定时器触发的时候有两种情况,一种是一种是create来了之后等配一直没等到出发了,对吧,因为如果等到了的话,是不是直接就清空了呀,另外一种条条件是不是配先来了,等great等到那个罗马涨到的时候没来触发了,对吧,两种情况,所以这里边。
25:26
还是根据。呃,这个状态的值判断哪个数据没来。呃,所以这里边我们其实判断什么呢?比方说首先就是判断hit state对吧,这里我就不取了啊,直接拿出来判断一下value state,如果说这里边有值的话,它代表什么含义呢?到了对,如果。
26:01
为真为处表示配先到了,没等到。Create对不对啊,但是这种这种情况比较罕见啊,所以我们直接把它输出到这个测试出流output,我们还是放在这个output tag对应的这个流里边,这里边包装成一个order result,那这里边的key从哪里去拿呢?是不是还是ctx get current key后边给一个输出的信息,那这个其实是已经配过了,对吧?啊,这个比较特殊,那还是they paid but,呃,这个是什么情况,Not found create对吧,Create log。啊,所以这是一种比较特殊的情形,那另外还有一种情形,如果要是说配没有的话,那是不是说明这是配来了之后。
27:07
对,Create来了之后配没有来表示create到了没等到。配对,所以这种情况,这就是我们标准的严那个呃,Timeout对吧,标准的超时把它包装成一个呃,Order result输出ctx。同样还是拿到当前的K这里边输出一个order timeout,哎,这就是我们要做的这些处理,当然最后定时器触发完了,那定时器已经已经可以消失掉了,最后还需要把state再做一个清空操作,Timer state。这就是完整的流程啊,所以大家看,如果要是做这样的一些操作的话,呃,尽管我们判断这个在这个过程当中判断的还是比较简单的情形,就是只有一个create,一个配,对不对,大家如果要是考虑多个create,再多个配,其实这个逻辑就又有问题了,所以这里边尽管是只考虑最简单的情形,我们分别按照不同的分支条件,If else,把它都考虑出来啊,就是这样的一个结果。
28:21
好,我们来测试一下吧。那接下来我们提起来之后,还是用测试数据去做一些测试,呃,比方说我们还用之前的这些数据啊。34729,这个四二的时候先来一个create,然后后边又来一个create,对吧,然后接下来我们是不是要配了。34729,嘿,诶,大家看是不是输入输入一条之后马上就输出了配的successful,因为我们在状态编程过程当中是if else里边把每种情况都是来一条数据就判断出来的,对吧?啊,并没有等待对应的那个时间的出发,所以这里边其实是实时性非常的啊,但是大家可能看到代价就是逻辑非常非常复杂对吧?呃,那我们还可以测一测其他的情况,比如说后边还有那个太帽子的情况。
29:14
呃,这里边我再create一个756。呃,然后我可以直接给一个很大的时间戳在哪里,这个二二对吧,Create一下,诶大家看到前面我们没有配过的两一两个730756,是不是直接就太out了对吧?呃,就是即使我们后面再去做处理它,这个也是已经太out的一个状态了,这就是我们这部分代码,大家可以下来之后把这个好好的实现一下。
我来说两句