00:00
好,那接下来我们主要还是要做这个实现这样的一个一个处理的流程了,Process function了,呃,在这里面大家会想到我们可能会有一些需要,就是把一些异常的情形是不是应该输出到测输出流里边去啊,哎,那这样的话,我们还是在前面先把这个不同测输出流的那个tag先定义出来吧,定义。呃,这个侧输出流tag,那比方说这里边我们直接定义两个啊,因为大家会想到有一种情况是在这一个支付,就是我们订单支付流里边,你检测到了对应的一个支付信息,但是没有在那个支付到账那个流里边检测到,那这个相当于是什么啊。相当于是不是有可能,呃,就就是到账这个环节出现了问题,对不对啊,有可能数据更新慢,或者是第三方平台那里边出现了一些问题,这可能是我们需要去解决的,另外还有一个问题是,是不是有可能支付到账这边收到了,但是配的那边就是我们的订单支付的那个流里边没有收到这个信息啊,这就是我们说的,比方说那边出现大量订单请求的时候,那个时候我们没拿到第三方平台,或者说那个当时那个请求太多,拿到的时候超时了,所以就给丢掉,没有写进去,对不对啊,这个过程当中就有可能出现两边不匹配,那他们可能对应的操作处理是不一样的,所以我们可能要分别给他们放到两个测试数流,对吧?啊,所以这里边我们定义一个,比方说一个色殊物流叫onmached pace,那大家会想到这个意思就是说。
01:46
我们两条流嘛,一个是order pay那边,那这个数据,也就是说这是在订单支付流里边找到了,但是没有在对账到账那个流里面找到,对吧?啊,这是一种情况,你有一个output tag,它的类型应该是什么?
02:05
这个时候我们输出的类型对吧,我们输出什么呢?是不是我直接输出,呃,就是单独有的那一个流,流里边的数据是不是就可以了,对我直接输出order event是不是就可以了,现在我们是来了一个order event,但是没有检测到对应的receipt event对吧?所以我看到这个当前的这个订单支付的事件,那我就知道了,对应的那边那个东西没有拿到,这里边可以给一个名字叫all matchched case,同样我们可以去定义一个UN matched receipt。你有一个output tag,这里边的,哎,就应该是receipt even的对不对啊,这条流我们输出的时候就是只拿到了一个到账的,那那个流里边的一个事件,但是没有跟他匹配的order pay的那个事件,对吧,订单流里边没有找到,所以这是两种不同的情况,On match receipt receipt,好,那接下来我们把这个定义为了共用前面定义好的这个output tag,还是直接就在。
03:20
呃,就在这个object里边啊,没函数下边把这个类做一个实现transaction pay match,它要去继承的接口是叫做Co process,大家看是不是有这个啊,Co process方式,尽管我们调取的时候用的是点process,但这里边实现的是一个Co process方式。当然这里面也得有类型啊,大家看一下这个类型是什么呢?啊,一般process方式其实就是一个输入一个输出,那这里边大家看它是不是有两个输入一个输出啊,啊,INPUT1 input2,还有一个out,那这里边它的INPUT1是什么呢?哎,之前我们说过是不是谁连接谁,它就是一,它就是二啊,所以一应该是all the event all the event,那么二应该是receipt event,那最后还有一个输出啊,输出的话我们想的简单一点,我是不是直接把这两个放在一起包装成一个元组,直接输出就代表他俩配对了啊,所以这个大家可以自定义啊,你也可以定义一个匹配成功的时候的一个标准输出,我这里边就简单一点,这是不是也可以啊,大家想。
04:34
直接包在一起,他俩配对成功,所以接下来我们真正要实现的其实就是,诶,大家看它变成了两个方法了,什么方法呢?这个其实也非常的明确啊,就是我们本来process function里边必须实现的不就是process element方法吗?这里边需要去重写两个方法,一个叫process element1,一个叫process element2,啊,所以大家看这就是把我们两条流的事件是不是各自来了之后,第一条流是不是就走这个方法,第二条流是不是就走这个方法,各自都可以去单独处理,哎,那他想,那他们俩之间又有什么关联呢?
05:17
那我们是不是就得去定义一些状态,然后去控制它里边处理的逻辑啊,所以如果我们保存了状态,在处理第一条流里边的事件的时候,是不是可以读到,诶第二条流里边有没有数据对吧?呃,所以大家想我现在应该是保存什么状态,是不是就是保存一下,假如对第一条流里边当前已经有了支付的那个事件的话,诶,我就存一下,然后如果第二条流里边已经有了到账的那个事件的话,我就也把它存一下,对不对?这样另外一条流来了,事件处理的时候,我就可以知道是不是可以匹配成功了,对吧?啊,如果说我现在只有自己来了,那我再等一会儿,如果说对面已经来了,那我们就可以直接匹配成功了。所以简单来讲的话,就是这样一个一个处理思路,状态编程的思路啊,所以这里边定义状态。
06:18
来保存已经到达的,呃,订单支付事件和到账事件。所以这里面我们应该是去定义两个状态,一个叫pay state,那么大家会想到这是一个value state,对吧?里边的这个数据类型应该是什么呢?Pay state,那应该是older event,这里边来了一个配的事件对吧,放在这里,所以它的类型应该是order event。哎,所以这里边我们get run contact,获取状态句柄,然后里边要去又一个de script value state script,好,里边给一个名字叫pay state。
07:20
Order event对吧?哎,这就是我们这样的一个过程,当然后面这一个跟它非常的类似,我可以直接copy下来,然后改一改,这里边就不叫state了,而应该叫receip state,对吧?另外这里边的类型是不是也不是all event了,它这个是来了一个是不是event表示一个到账事件他先来了,所以我们保存成这个类型。这里边也得改receipt,呃,这个注意状态名字也得改,不能叫一样的名字,好,这样的话就把这个想要有的状态先写好了,然后接下来我们就分别去实现ELEMENT1和process element2,对吧,这里边其实是订单支付事件数据的处理好,当然后边这里是。
08:21
到账事件的处理处理。呃,那这里边为了区分,我们觉得稍微看清楚一点,它这里边是统一都叫输入的这个数据都叫value是吧,那我们写的清楚一点,比方说这里就叫配吧,这是我们那个订单的一个支付事件对吧,后边这个就叫到账receipt吧,所以改一个这个参数的名称,大家应该就看得更清楚一点,呃,这里边process element1里边其实就是支订单支付流,事件流里边来了一条数据,那我们这里的实现。怎么做呢?订单事件来了之后,是不是应该先去判断有没有对应的那个到账事件,有啊,那我们判断什么,是不是就直接从状态里面去取啊,如果要有,是不是应该存到状态里面去,哎,所以现在我们的这个过程是这样的啊,就是判断。
09:25
有没有对应的到账事件,所以我们先把这个到账事件先拿出来,Receipt应该是什么呢?是不是从receipt state里边去取,对吧?把这个value拿出来,这个拿出来之后,它的类型是不是就是一个receipt event啊,这是我们之前应该是存过的,那到底有没有呢?我们要判断一下,如果receipt啊,如果不等于nu的话,是不是表示已经来了,已经有了,对吧?如果已经有的话,我们就直接把它在主流里边输出,呃,如果已经有receipt。
10:11
在主流输出匹配信息哦,所以我们这直接就可以out.collect呃,注意主流要的这个数据结构是什么呢?他要的这个记录的结构是元组对吧?就把这俩包在一起,一个二元组输出啊,那我们这里非常简单,是不是就一个叫配一个叫reipt啊啊直接输出就可以了,好,接下来已经输出了,那是不是可以把这个状态清空了啊,这里边既然我们现在还是认为就是一个一个那个传式I,我们现在是以传ID做的key buy对吧,一个传SID,我们默认应该是两边最多只有一个数据对不对?呃,应该是这样的一个状态,所以这里边假如说这里到来的是这个呃配的这个事件的话,那之前配的状态是不是肯定是肯定是空的。
11:12
我们没有做过保存对吧,那这里面肯定是空的,所以我只要清空receipt state把它清一下就可以了,这是我们处理的这个流程啊,清空状态。那如果要是没有呢,还没来呢,对还没来的话,是不是就应该要等一下了,对吧,如果还没到,那么那那怎么等呢?是不是首先我们要把是不是把配药存入状态啊,对吧?存入状态并且干什么呢?是不是要注册定时器等待啊。注册一个定时器等待,所以接下来我们做的是pay,先做一个update,是不是就直接把配放进去完事啊,先更新一下啊,所以我们只在这里边更新,上面没更新过就不用清了,对吧?呃,大家把这个逻辑只要是梳理清楚就可以,然后接下来注册定时器time service,然后register,还是even time timer对吧?那这里边定时器的出发时间到底给什么时候呢?
12:32
那大家想这相当于是什么呀?按照water mark的那个水位的标志,然后要看。对面那条流里边什么时候来对吧?啊,所以这这这种考察呢,大家也得看具体的这个数据了。就是假如说这两条流里边,本来他们的这个实践时间就不一样的话。那那有可能我们本身就得设置一个那边就比这边慢几秒钟对吧?啊,那这里边是不是就得延迟几秒钟再去判断啊,你不能直接说,哎,这个当下如果要没有的话,没有就直接报警了,不是这样的,肯定是要等一段时间,另外其实我们还考虑到什么,就是他俩之间是不是有可能这个。
13:20
说不准,这个其实没有绝对的对吧,不是说这条流里边的数据一定就比对应的那边支付,就是到账流里边就一定比那个支付流里边那个数据就就会,呃,来的就会迟一些,这个是说不准的,所以是其实是往前往后都有可能,呃这个大家就看具体的数据去做一些调整了,我们这里边就直接给一个比方说往后默认等待五秒钟时间是不是可以啊。对吧,把后面的这些数据都等一等。然后我们这里就是配点event time乘以1000,这是当前的带着的这个时间戳,在它的基础上再加上五秒钟加5000对吧?呃,这样的话代表我我什么时候判断这个就算是没等来呢。
14:12
提出信息,首先如果是乱序数据的话,我们前面可以把这个watermark产生的时候再加一个最大延迟对吧?呃,前面可以加上这一部分处理,然后这里边触发的时候是要靠watermark涨到那个位置才能够触发,而且触发的时间,比方说当前这个支付是啊,这个十点钟支付的话,就是我们检测到这个order pay里边的那个时间,十点钟的话,那是不是后边后边我们是10.0500分零五秒这个时候截止waterwa长到这个时间为止才去报警,对吧?啊,所以就是我们要求是另外一条流里边对应的这个时间戳是不是不能不能要比零五秒要小啊。呃,比零骨表要大不能对吧?零骨表之前只要在之前它到了就可以了啊,那有些同学可能就会想到,我现在有两条流了,这两条流他们的这个water mark又是怎么处处理的呢?这到底按后边我们connect之后按哪条流里边的water mark算呢?对,当时给大家讲过,如果说出现多个上游任务的话,Watermark是不是相当于,哎,当时讲到自己这个任务是不是要对每一个分区去维持一个分区watermark呀,然后要比对他们中间对最小的那个作为当前任务的这个even time的时钟,对吧?所以这里边如果一条流比另外一条流慢的话,我们后边的处理会以哪条流为准啊,慢肯定是慢的那个,对吧?哎,所以这里边这个整体来讲,如果数据这个,呃这边来的比较慢,其实是没关系的。我们。
16:00
这里两边如果做了connect的话,最后处理效果就是要以比较慢的那条流为准,以他的waterma为准,所以我们其实也只是看这个数据里边,它的这个时间不要本身不要比那边迟到太多就可以了,比方说我们还是看一眼啊,这个比方说第一个这个这个transaction啊,在这里搜一下。好,第一个居然就没有,好吧,我们搜第二个。好,第二个这个有大家看这里边的支付的时间是0844,而这里边是0847,所以在这条流里边是不是本身它发生的事件发生的那个时间就要之后三秒啊,对吧?所以说如果我们这里边等五秒的话,它是不是应该可以正常匹配得到,如果我们只等两秒钟的话,是不是相当于这里边就就匹配不到了呀?因为后边只要如果我们数据这里边这条流里边的那个watermark已经涨到四六的时候,是不是相当于就应该要触发那个定时器了,而这里边的数据它是四七的时候才到,对吧?啊,这样的话显然就会没有办法匹配了,所以这个关于这里边这个到底给多少啊,就是这里边的这个延迟,这个定时器的触发时间到底给多少,这个大家还是要根据具体的应用场景,实际数据是什么样的。根据这个去。
17:29
调整好,这样我们就把这个process element1要处理的任务都完成了啊,大家看这是不是有点像我们前面做那个状态编程的时候,分别去考虑不同的情况,对吧?来的是哪个,只不过我们直接是把它合并到两条流里边,就是合并到一条流里边,然后用这个Co process方式去做处理了,上面我们是处理了来了一个order pay的时候怎么去做,那接下来另外一条流如果来了一个receipt怎么办呢?对,是不是一样的处理啊,它们俩是平等的对不对?所以这里边接下来还是同样的处理流程。
18:14
首先我先定义一个配,是不是要从state里边先把它拿出来。先保存到,然后去判断它是不是为空,如果不为空的话,说明是不是另外一条流已经有对应的那个配已经来了呀,哎,这里大家注意,为什么我们可以直接去判断另外一条流里边有没有呢?因为是不是已经做了按照transaction ID做了分区,做分做了分组了,只要这一条流里面当前处理的过程当中能够进来的数据是不是都应该是同一个传达ction ID啊哎,所以到这里边,只要这里边有东西有数,那就是同样的一个交易的对应的那个到账信息对吧?所以这里边我们就判断它有没有就可以,如果有的话,是不是直接主输出流输出一个信息啊,这里边还是配合receipt包起一个二元组来输出就完事,然后接下来是不是把PA要做一个删除啊。
19:20
清空。好,然后接下来else,如果要是没有的话,没有的话怎么办呢?啊,那还是一样吧,是不是先把这个receipt state先更新,把这个先存进去,然后注册一个定时序来等上几秒钟啊,啊所以这里边ctx time service register even time timer啊这里当然大家如果要认为这边就应该提前来的话,那你甚至可以按我们那个automark直接就按当前的时间戳去定义,对吧?啊,有点像我们之前做状态编程里边超时事件那个状态编程一样,那这里边我们还是因为这个说不准的啊,两条流的话,这个事件时间呃,可能确实是有,有可能快,有可能慢,我们可以注册一个receipt。
20:11
当前的event time基础上。再加一个五秒钟啊,所以这只是给大家举一个例子啊,具体实现大家可以考察数据,然后做一些其他的调整,呃,这样的话我们就把process element2个方法都已经搞定了。另外还有什么没没做啊,最后一件事情就是对定时器触发的时候,我们这里还没报警呢,什么时候报警,定时器触发的时候是不是就该报警了,那个时候如果要是发现诶这里边还没到的话,呃,那就直接输出到测试枢纽里面去。呃,所以这里面的逻辑就是。定时器触发,这就是到时间了。
21:05
啊,那么如果还没有收到某个事件,那么输出报警对吧?诶,所以这里面我们要判断的是什么呢?那其实就是要看PA state和receipt state这俩是不是到底有没有,如果PA state里边,诶大家会想到啊,如果它不为空的话,那是不是代表哎,这里边写错了啊。如果它不为空的话,代表哎,如果past it不为空,说明是不是对对面的那个receipt一直没来啊,因为如果对面的receipt来了的话,是不是就会把它清空啊,不是不是这个是清空这个配对吧,如果他来了就会清空这个,所以配如果不为不为空的话,证明receipt没有来。
22:11
Receipt没来,所以我们的操作就是输出一个测输出流,当前的那个标签就应该是on matched paste,对不对啊,只有pay没有receipt,这里边给一个是不是把这个past it它的那个value直接存进去就可以了啊,这是我们当前的这一个。输出配到侧输出流。那同样,如果说大家看这个逻辑是完全类似的,对吧?呃,我不用else,直接后面判断就可以,如果说receipt state它的value不等于空的话,说明receipt一直在,是不是没有调到这个receip state clear这里来啊,说明是不是一直呃这个配就没有来对吧?啊,那如果要是这样的话,我们这里边就做一个。
23:17
Output把它输出到UN matchched receipt里边,那么这里边是receipt state value把它存进去,诶这里边我们处理的只是它俩不为空的情况,对吧?哎,那有有些同学可能就会想到,那假如说啊,假如说我这里边啊,这个也是空,这个也是空,那那怎么办呢?那如果他俩也是都是空的话,正常来讲是不是就应该是在前边,哎,直接这里边那个output应该已经输出一个正常的流啊,啊,所以这种情况其实是我们正常的那种情况,大家其实不用做其他的处理啊,所以这里边接下来还要做清空操作,His state clear,还有receipt state,做一个clear。
24:09
这样就把所有的流程处理完了,最后处理完之后要做这个收尾的工作,所以大家看这就是我们对两条流做一个匹配,然后做一个处理的过程,整体流程其实还是比较简单,比较清晰的,好,我们做一下测试吧。哦,这里大家注意好像没有没有写输出是吧?呃,这等一下我们得停掉,重新来一遍啊。好,这里边我们要输出的话,那应该是process stream,如果这里边直接print。那应该打印出来的是匹配成功的,对吧?我们叫match,那么如果要是想要看那个报警的信息的话,是不是得get side output啊对吧?这里边有两个,一个叫on matched pace,这里边给一个输出on match pace,然后另外还有一个叫。
25:12
Get set outp叫match的IP,同样把它打印输出。好,接下来我们运行一下,看看结果怎么样,这边已经输出了,大家看一下这个结果是什么样子哦,这里边是不是有一些就是检测到是没有匹配到的receipts,有些检测到的是没有匹配到的配对吧?啊就是说明在一条流里边流有,另外一条流里边就没有,那大家看这个match。已经看到的是不是大部分都是都是match起来的呀,啊,这里边特殊的,我们把它放到特殊出流里边去做一个比对就可以了。这就是我们这一部分内容。
我来说两句