00:00
在这里边定义了,又是在process这个方法里边自己定义了一个process function,那这个process function又应该是什么呢?大家可能想到,诶之前做过那个keep buy啊,肯定应该是kid process方式,这里大家要注意,我们现在是connect之后的,得到的是一个connected streams这样的一个特殊的数据类型,对吧?调的是这个connected streams里边的process方法,那大家要看它里边要求的这个数据类型是什么样的,对吧?诶这里边大家看到我这里面如果做过KY的话,里边是什么?是一个key的Co process对吧?其实是这样的一个东西啊,或者我可以直接实现一个什么,大家看上面还有一个这个可以直接实现一个Co process function对吧?啊,直接按照这个去定义就完全没问题了啊,所以这里边我们其实最终之前大家会想到就是process方式,我们说有一一大个家族,对吧,各种各样的这个,呃,这个流。
01:00
这里边我们去调process方法的时候,都可以传不同的process function,这里边我们又接触到一个Co process function,这就是他要处理两条流里面的数据,对吧?啊,这是这一部分,那接下来我们就是给大家把这个内容做一个具体的实现了。还是在这个下边class把这个做一个定义啊,Transaction pay match result啊,然后这里边我们extend,这里边我们定义的是一个K的Co process方式,对吧?呃,然后这里边呃,当然在这个过程当中,大家可能会发现就是说呃,你假如说直接给这个,呃,就是说给这个呃,Co process function其实也是可以的,为什么呢?呃,因为就是当前啊,我们这里边如果说你前面已经把这个做过这个KB之后啊,这两条流连接的时候,它就已经是按照这个K连接在一起的那个状态了,对吧?所以你这里边就如果不做这个K的Co process function,直接给一个Co process function也是可以的。好,那你如果要是前面我们这里边是connect之后,然后又做了一个key by的话啊,那接下来你就必须要去实现一个k process function了对吧,啊,就这里边我们不实现这个直接。
02:21
跟调这个Co赛方式也是也是一样的啊,也是没问题的,因为前面我们这里边P是两条,我们这里连接的是两条P的stream嘛,对吧?呃,就是后面我们并没有做再去做key操作,好这里边大家看这个Co process function比key的Co process function就少了一个key的类型定义,对吧?啊这里面就是INPUT12OUTUT啊就对一个输入一个输出,那里面我们是两个入一个输出,对吧,跟这个一般的function就不一样了,那这里边我们第一个输入什么呢?样例类order event,第二个输入还是样例类,诶,这是result event对吧?最后的输出类型,哎,我们说如果要是匹配上输出什么呢?哎,我们说就把它包成二元组不就完了吗?或者你想单独定义一个样例类也行,对吧?哎,这里边我们就是不说这个了啊,直接把它定义好就完事了,二元组包在一起一目了然,把它这个放大一点,里边大家看一下需要实现的是什么呢。
03:21
大家还记得之前那个process方式里边要必须要重写的一个方法叫做process element,对吧?每来一条数据调用里边的process element,现在呢,重写两个方法,这两个方法一个叫做process element1,一个叫做process element2啊,非常直白,对吧?什么叫一国两制,这不就是吗?啊,就像我们之前那个Co map方式啊,Co flat map方式是不是也得传两个啊,两两个处理的那个函数,我们这里边它就直接process element1里边,大家看这里边的数据是不是就是order event呀,对吧?第一条流流里边的每一个数据来了之后,按照这个process element一来做处理,然后第二条里边的receipt receipt来了之后,我们用这个process element2来做处理,那为了方便后边我们看的更清楚一点,我直接改个名,这个叫什么,这个我们当前需要的是不是就只需要当前的那个支付的那个事件呀,配的那个事件对不对,哎,我就直接把它叫做配。
04:22
好了啊,然后后边这里边我就直接叫做re received对吧?改个名诶,不是叫这个received event就叫做re receive好了,然后这里边就还涉及到一个问题,前边我们在做这个就是流的这个定义的时候,是不是还应该做一个简单的过滤啊,因为之前这个order event里边我们并不是所有的数据都有transaction ID的,对吧?而且我们并不是所有数据都要你像那个创建订单你就根本不需要嘛,修改订单不需要对吧?我们只需要支付订单,而且也只有配的那个事件才有传单ction ID,呃,所以这里边我们需要做一个过滤啊,上面做一个filter,然后里边我们要求的就是当前的,呃,比方说我要求当前的这个传单品ID不为空,对吧?或者说我要求当前的那个even type必须得是得是配对,这两个其实是一样的啊,这当然就是更好的方式。可能是要。
05:22
走这个传载市场ID不为空对吧?啊,因为假如说出现这个异常啊,配了之后没有拿到那个传场ID的话也可以解决,但大家知道这里边我们的逻辑语义其实是一样的含义啊,这样的话就保证这里边来的事件肯定是配了对吧?啊,就所有的只有配事件才会调到这里来,然后后面我们做处理的过程当中,大家想想到底具体怎么去做处理呢?那我现在来了一个配到底要干什么呢?是不是这就跟我们之前在做那个,呃呃,在处理那个就是订单超时的时候,Create配那两个事件的判断一样啊,我来了配事件之后,是不是我就要看有没有同样一个传达ction ID啊,就当前同组的对应的那个receipt来呀,对吧,我得判断一下这个它来了没有,同样的receipt来了之后的调用这个press element2是不是也得看一下有没有同组的那个配来了,对吧?如果已经来了,现在我另外一半也来了,匹配上直接输出结果,对吧?如果要是没来怎么办?
06:22
等对,那怎么等注册定时器对吧?哎,这个处理流程就跟之前为什么要单独再给大家讲一下这个process function式的那种实现呢?就是跟这里边的处理思路是一模一样的,只不过这里边我们在这个抠process function里边,大家看这个处理逻辑大大简化了,为什么?因为他把这个if else已经给我们用两个函数分别包分开了,对吧?我们前面做的if else判断,你不用做判断了,它就是一国两制嘛,来了配就近这,来了IP就近这啊所以接下来我们还是按照之前的那个定义先定义状态对吧?哎,对,呃,懒加载定义状态啊,定义状态我们想要定义什么状态,那是不是需要一个啊,就是是否来过这个pay的事件,然后还有一个是否来过IP事件,对吧?哎,但是最后因为我们这里边做了一个偷懒,就是直接就是把这个当成二元组输出了,那你如果只有一个出false的一个值的话,那你不知道当时那个事件到底是啥,对吧。
07:22
哎,那我们这里边不要定义那个就是布尔类型的变量,我直接按照这个样类型定义一个value state对吧?把整个纸保存下来不就完了吗?也表示它来过了对吧?哎,所以这个就非常简单啊,我们定义这个状态保存,用于保存当当前交易对应的,呃,支付事件就是订单支付事件和到账事件,诶这就是如果来了之后就存到这儿啊,所以接下来我们LA定义一下啊啊,那首先是应该有一个pay event state,它应该是一个value state,我们把这些引入它的类型是什么?All event对吧?样例类类型啊,所以后面这个定义一模一样啊,Runtime contacts,然后我们get state,对吧,New。
08:22
一个value state script描述器,然后里边给一个名称,哎,这个我就直接叫pay,然后class of order event,然后接下来是不是非常类似啊,对吧?定义一个receip event state,哎,也是value state里边注意这个类型就是receipt event对吧,样例类类型,然后同样get runtime contacts get state里边new有一个value state script,现在是receipt类型啊,然后我们里边给一个receipt名称,Class of receipt event,还是这样定义,对吧?哎,把这个定义好就完了,然后大家记得之前我们还有一个定义的状态,就是要删除定时器对吧?啊,所以要有一个那个定时器的时间戳,那这里边我们到底删除不删除呢?这里边我们偷个懒,干脆就不删除了,为什么呢?我直接可以,因为没那么复杂,对吧?
09:22
啊,这它是分别在两个这个分支这个函数里面去做操作的嘛,那我就是如果就判断另外一个来没来,如果来了的话就直接输出,没来的话我就直接等着对吧?呃,就是来来了来了之后呢,我直接输出,也不用删除定时器,那怎么样呢?定时器到点的时候啥都不干不就完了吗?对吧,这个其实也是一种简单的方式啊,可以把我们这个代码做一个简化,那所以接下来我们就不定义那么多了,接下来就是process element一看看这个怎么实现吧,现在来了一个配事件,那是不是要看对应的,哎,这是订单支付事件,订单支付事件。
10:04
来了,要判断之前是否有就是对应的到账事件对吧?到账事件诶,因为这个没准嘛,既然都是两条流了,那这个乱序就更更乱了对吧?呃,谁等谁都是有可能的,所以接下来我们先把它拿到啊,Re event,呃,我我们不要叫receipt event了,就叫receipt吧,对吧,我们直接从那个receipt event state里边拿它的值value拿到,然后接下来呢,判断一下,大家知道它本身是一个当前里边的这个类型,是一个reip嘛,是一个对象,呃,本身拿到应该是个对象啊,如果说里边没值的话,那就变成none了对吧?哎,所以如果不为空的话,那说明已经有了,如果已经有receipt,已经有这个到账事件,那么怎么样正常输出匹配对吧,然后另外清空状态。
11:04
哎,所以这里边我们接下来就是out.collect这是正常输出,我们要的不就是一个二元组吗?这个非常简单,K receipt,我们都不都不都已经定义好了吗?对吧?当前输入进来的是配,然后这里边是receipt,对吧?拿到的是receipt输出就完事了,然后清空状态,清空状态大家可能会想到我其实只要清空这个receipt event就可以了,对吧?啊,当然如果要是有同学想就是简单粗暴的话,我把两个都清空也是OK,因为之前其实这个配没来过嘛,对吧,你现在才刚来这个配,那之前这里边肯定没东西啊,所以你想清空也是一样的啊,然后接下来那就是else else啊,哎,这就是如果还没来,那怎么办呢?哎,那就是注册定时器开始等待对吧,定时器开始等待,哎,那这里边大家会看到啊,Ctx timer service,然后。
12:04
Mr even time time对吧?哎,那我用什么时间呢?当然是用当前的这个pay里边的time stamp对吧,乘以1000这个时间戳注意这就涉及到一个我到底要等多久,哎,那等多久,大家想这个怎么去定义呢?其实主要就看这里边的这个数据了,对吧?因为现在两条流这个真真说不准,这个没准对吧?啊对这这就还是说我们看那个最大时间差对吧,就是你大概的看一看两条流里边啊,就是同样的一个ID,然后在那边这个时间是多少,在这边是差多少,呃,如果说这边比那边就是提前的话,那就那就算了,这这无所谓对吧,如果说要是比那边,比方说呃,大三的话,我就等三秒对吧,如果大五的话,我就等五秒啊,这个就可以自己按照自己的这个需求啊,按照我们判断这个数据的特征去做定义啊,那所以这里边我我我就不详细再去看那个数据的情况了啊,我们直接给一个比方说等五秒吧。
13:04
加一个5000对吧,放在这里,所以这里边就是开始等待五秒,我直接把这个写进来啊,然后另外大家注意还有一个事情没做,干什么得更新状态对吧?因为你现在状态编程嘛,状态在不停的改,你现在既然是这个配已经来了,那那个配里面是不是得把它塞进去啊,对吧?所以这里面更新状态,更新状这里边pay event state update,把当前的pay放进去就完事了,这就是所谓的这个处理的过程,对吧?啊,那大家会想到现在是配来了,呃,那个在等那个receipt,那假如说是receipt来了呢?那现在这个process载里面的二对吧?哎,大家看这是不是完全对称一模一样啊,哎,所以这个我就直接照抄就完事了,对吧?直接照抄,然后把这个直接拿过来啊,然后里边要改一改,对吧,现在是什么来啊,到账事件来了对不对。
14:04
这里边到账事件来了,要判断之前是否有啊,就我直接简单写啊,是否有订单的那个支付事件对吧?Pay事件,所以现在我就不是receip了,我要拿的是pace state里边的那个东西对吧?然后接下来判断的就是pay,如果不等于那样的话,如果已经有这个配的话,正常匹配对吧?输出是不是还是配receipt,哎,这俩就还是放在这儿啊,因为我们定义的那个配是在前面的嘛,然后边清空状态对吧,一模一样,然后如果还没来的话,比方说这里边我这个两条流可能分别等的时间不一样对吧?啊,它比他这个慢的可能是五秒,这边反过来可能不一样了,比方说我等三秒啊,这是还是看数据啊,具体数据是什么样的,我们定义好,这里边就加三三千,然后这里边注意用的就是对receipt time stamp加起3000,然后注意更新的是对receipt state,然后把这个receipt更新进来,原来看就是这样的一个处。
15:04
流程对吧?啊,一点问题都没有,然后最后还有一步不要忘记定时器对吧,因为你前面不是要注册定时器吗?诶那这里面大家可能就会有一个问题,这里边这个定时器出发的时候到底要干什么事儿呢?定时器出发它到底会怎么样,在什么情况下出发,因为前面我们没有删除定时器对吧,所以在哪种情况下都有可能出发啊,只要你那个注册了对吧?啊但是注这个到到时候触发的时候呢,还有几种情况,就是假如说我已经成功匹配上的话,大家会想到这里面有什么情况呢?状态是不是就都都清空了呀,对吧,如果状态都清空,那是不是表示我就啥也不用做了,对吧?接下来你就直接这个你你既然是触发触发就出发呗,啥都不做,你最后啊,最多我们我再来一个清空状态对吧?啊这这个就完事了,然后呢,更加常见的情况是什么呢?是这两个状态里边应该有一个没被清空对吧,有没有。
16:04
可能两个都没被清空呢。不可能两个都没被清空,大家想是不是对,肯定第二个肯定有先后嘛,对吧,那第二个来的时候是不是肯定就输出,然后就清空了呀,所以接下来就是可能有一个没被清空对吧?有一个没被清空的话,那代表什么?是不是就是另外一个没来啊对吧?就是没被清空的那个状态,他来了一直在等,等到定时器出发,另一个还没来,所以接下来我们其实就是判断什么呢?其实就是要定时器出发的时候啊,定时器出发判断状态中啊,哪个还存在对吧?还存在就代表另一个没来啊,所以接下来我们的判断就是if判断什么pay even the state这个value啊,如果它存在不等于none的话,那是不是代表就是那个receipt没来啊,对吧?哎,那所以这里边我们定义一个输出吧,这个输出到哪。
17:08
哪呢?我们现在不能输出到主流,主流是那个配对的呀,那大家想是不是输出可以输出到测输出流对吧?哎,我们定义一个侧输出流,这里边我们定义这个输出到侧输出流,相当于做了一个报警提示了啊,就是诶这两个不匹配对吧?你接下来要做一个处理了啊,那这里边我们得定定义这个测输出流的话,我们还是在前面定义一个标签吧,大家还记得这个对吧?这测输出流标签,呃,我们直接定义啊,比方说这里边首先这这两个可能还不一样,一个是一个是这个配来了,呃,Reip没来,另外一个是re来了配没来,对吧,因为它里边输出的这个数据类型可能不一样,我们定义两个不同的标签,定义两个不同的测殊出流啊,那所以这里边我先给一个,呃,这个没有匹配上的配order pay的事件,这个我叫做on。
18:08
Match on matched pay event output t,对吧?给这么一个表现,尽管有点长啊,但大家一看就知道对吧?没有匹配上的配饰件啊,它是一个凹凸T里边的类型是什么?应该是。没有匹配上的配,是不是就把配的那个类型输出就完了对吧?哎,那配的类型是order event对吧?哎,把它这个定义出来,然后这个我们叫all matcht pay对吧?然后接下来同样我还可以定义一个on matcht receip event output tag,然后这里边我们又出来output tag啊,然后这它里面的类型当然就是receipt event了啊这里面同样我定一个on matched receipt,哎,这两个先在外边定义好好然后接下来我们接下来输出的时候,那这个就简单了,对吧,Ctx测输出流嘛,啊然后on matched,这是这是这个配不为空,所以是on matched配对吧?呃,这个把这个标签传进来,然后呢,把当前的pay event state value直接传进去,是不是就完事了,大家想想是不是这样对吧?把当前这个状态拿。
19:28
出来,这不就是他来了没来嘛,呃,所以他是一个没有匹配的配配事件对吧?把它输出到特殊物流,然后另外同样还有另外一种情况啊,那就是if这个同样的对吧?如果receipt这个点VALUE6,它如果不等于空的话,里边有值的话,那是不是ctx output,大家看这里边处理就是完全对称对吧?哎,直接把这个receipt它里边的这个值放到这输出就完事了啊,那最后如果输出完成之后,我们还可以做一个清空状态对吧?就不管他前面输出不输出,你就算啥都没做,那我清空状态也最多就白清空一次对吧,也没什么影响,所以直接把这个清空就完事了。
20:14
这就是一个完整的处理流程啊,啊,当然前面如果说我们要想这个输出测试的话,你是不是这里边还需要再做一个定义啊,对吧,不光是这个reip receip,这相当于是什么呀,这输出的是成功匹配的数据对吧?这里边是match的数据啊,然后接下来我们是需要这个result stream里边去get set output,哎,那那这里边稍微麻烦一点,我没有定义在外边,对吧,我把这个去啊。这里面你有这个tag进去,然后再print输出当前是啊,我们随便写啊的对吧,然后后边同样是不是这里面还应该有一个对吧?这里边中间这个我们还是直接copy这里边的这个tag啊,防止写错,因为这个有点长了,尽管大家看着长,但是知道这个含义是没什么区别的,对吧?啊,这里边直接把这个定义好就行了,On matchceip,这就是一个完整的流程。
21:26
好,我们现在运行一下,大家看看输出的结果是什么样的啊。我们看一下运行的结果哦,这里边大家看输出了大量的这个信息对吧,大部分这里面给的信息是match的匹配起来了对吧?然后接下来大家会看到下边有UN matchched receipt啊,就是这里面是什么呢?就是有这个receipt,但是没有对应的那个,呃,Order pay对吧?哎,这里边就是一看就是只有一个,没有另外一个我们检查一下order里边啊,我搜一下这个这个诶果然没有对吧,确实是没有,那或者下面还有这个的,就是大家知道这个,呃,34731,这个相当于应该是在我们的对应的这个receipt log里边应该没有出现过,对吧,我把这搜一下,果然也是没有啊,所以这个大家就能看到这里边的一个具体得到的结果啊。
我来说两句