00:00
那我们就是要实现这个自定义的,这个叫Co process function对吧。实现自定义抠process方式,好,那public static,我们把这个定义出来,Class,当前这个叫transaction pay match detect,那接下来既然他是process大家族的一员,大家知道process function都是都是负函数对吧?呃,所以这里边肯定就是一个抽象类了啊,Extend,一个Co process function,然后大家看到这个Co process function跟之前那个Co map function Co flat map function差不多是吧?它是不是也是三个泛型啊?这三个分别是印一印二,然后al,那大家知道,因为它是一国两制嘛,是不是当前两条流的数据类型可以不一样啊,那我们当前确实不一样,一个是order event,一个是receipt event嘛,然后最后还有一个输出类型,对吧?啊,所以这就是当前这个Co process方式,然后大家看一下里边我们必须要实现什么方法呢?
01:10
大家还记得之前那个扣map吗?里边是不是一国两制的话,就是MAP1MAP2啊,对吧,每一条流具体来的数据单独去做操作就可以了,那现在它这里面也是一样,是不是叫process element1process element2啊。啊,里面的区别是不是就是大家看除了当前输入数据和al的这个输出之外,al.collect对吧?啊,那么还有另外一个上下文,这是不是process function的特点啊,哎,所以整体来讲,前面我们讲完了之后,大家就会看到这个都类似啊,基本上都差不多啊,所以接下来我们先把这个类型写出来,首先第一条流里边的数据的这个类型,哎,大家想当前我们这个谁是第一条流呢?哎,这要看这个不能乱给,对吧,这是要看之前是不是谁connect谁呀,你像之前我们这个是order event去connect receipt event,所以这里边得到的这个connected stream里边,它的类型是不是order在前面,Receipt在后边啊,哎,所以大家看这个stream里边不就是IN1IN2吗?啊,这个类型就是这样去看的啊,当前我们的第一个类型。
02:22
对应的。呃,在在这个transaction pay match里边啊,这里边第一个类型当然就all event,然后第二个类型就是receipt event啊,当然后面还应该有一个输出类型,输出类型好像我们没单独定义,那大家想输出输出个什么呢?好像也没有什么特别需要输出的,呃,就是他俩匹配起来了对吧?这这里是我们主流的输出嘛,测输出流那边是不是已经定义好了,因为测输流只有一个嘛,我就把当前有的这一个直接输出就完事了,那主流怎么办呢?主流也简单,我干脆匹配起来了嘛,我输出一个二元组是不就行了,哎,它俩都放在一起直接输出对吧?所以这里边我定义一个,呃,TEMPLE2啊,直接给一个二元组,诶注意这里边大家不要用这个skyla里边的这个二元组啊,我们还是用Java。
03:20
Java里边的元组类型,然后接下来里边需要定义出来当前是一个order event。一个order event,一个receipt event,这就是我们当前的这个整个的定义,对吧?啊,当前的这个类型就对了啊,那这里面得到的这个数据类型大家也知道了,是应该是一个二元组类型对不对。好,把这个object更改一下,这样的话我们的类型就不报错了啊,然后接下来这里边最关键的其实就是是不是两个process element方法呀,一个process element1,一个process element2,那大家其实知道当前这这个一国两制process element1里边的这个是一个order event,对吧,而且我们已经做过过滤了,它是不是就应该是一个配事件啊,对吧,我直接把它改一下名,那这里边其实就是一个到账事件,Receipt。
04:17
把它同样也改一个名receipt对吧?啊,那接下来大家会想到我这个来了一个配事件之后,这能怎么办呢。那是不是要看之前到底有没有对应的那个receipt来呀,因为现在我们已经按trans ID都已经分组了,所以我是不是可以在里边定义一个k state呀。K sit,就像我们之前那个,呃,Create和配一样,是不是就判断他之前来过没有就可以了,哎,只要有这样一个标志,那就是如果他都来过了,那是不是我当前直接把它匹配起来输出啊,如果没有来来过的话,那是不是自己去更新一个我来过的一个状态,然后注册一个定时器开始等待,所以整个处理流程跟前面那个create配订单超时支付其实是差不多的,对吧?大家看这个逻辑其实类似的啊,所以首先我们在外面要定义状态,定义状态,只不过呢,之前我们定义的是直接定义了一个布尔类型的is配的is created,那现在呢,我们直接干脆就定义。
05:23
就是保存。当前已经。到来的。订单。支付事件和到账事件。所以也就是说,只要这里边有数据来了的话,我就直接把它原封不动的那个事件保存下来就完了,对吧?啊,为什么要保存下来呢?是不是因为我们最后输出是直接把它合到合并到一块要输出啊,这个你就不能,呃,就是我们这里面就不能说只有一个状态就完了,对吧,你得把那个原封不动的数据要输出嘛,所以我们现在定义的这个value state。
06:02
Value state里面的类型就应该是当前的视线类型,对不对?首先是一个order event,哎,这个就是我们当前的pay state,对吧,是否已经有这个订单支付的事件,另外还有一个是。Receipt event,这里边我们注册的话就应该是receipt set对吧,如果说这里边没来的话,这个状态是不是就应该是空,如果要是有东西来的话,那大家知道当前是不是就是对应已经来的那个事件就在里边了,哎,所以这个其实也很简单啊,那接下来我们还是要去。实在一个open生命周期里边get runtime contacts啊,去做一个赋值,首先get runtime contacts,然后get state里边去new一个value state script里边我给一个名称,当前这个就叫做pay,后边是不是order event class对吧?诶,当前这个默认就是空值,我这个不用去单独再给定初始值了。另外receipt state也是类似get time contact,然后get state里边new一个value state script里边再给一个这个名字叫做receip,同样receipt event.class。
07:21
状态先定义出来好,然后接下来我们就是看process element1 process element2到底要干什么事了,那大家想,现在process element1里边其实是。这是既然调到这个方法,那其实大家想是不是就不用再做第一重判断,判断当前数据是啥了,是不是肯定是一个支付到账,到账的这个,呃,不是到账啊,是订单支付的这个事件啊,配的这个事件对不对啊,所以这里边我们是订单支付事件来了,那接下来我们要是不是要判断是否已经有对应的到账事件,所以这个判断非常的简单,我们是不是先把当前的那个receipt那个事件先拿出来,对吧?Receip,然后我直接就从这个状态里边receipt state state里边value先拿出来,那大家想判断它是否已经到了,用什么方法,是不是直接就是如果它不等于none的话,是不是就已经到了。
08:31
哎,如果没闹的话,那就是没到啊,诶大家看如果。Receipt不为空,说明到账事件已经来过,那么接下来我们干什么?是不是直接输出匹配事件啊,对吧?输出匹配事件。然后接下来就是清空状态就完事了,对吧?清空状态这就是我们当前要做的事情啊,啊首先这里边我是out.collect做一个输出,你有一个,当然我们要的是二元组,对吧?这里边二元组也很简单,是不是就是一个配一个receipt啊,现成的嘛,放在这儿了,然后下边是清空状态,那就是pay state clear,然后receipt state clear。
09:28
当然其实大家知道这里边应该是只有receipt里边有配,我们还没保存过对吧?哎,这里边直接清空其实也不影响啊,然后另外大家可能会想到,那如果else呢,如果说这里边没有这个。如果还没有这个receipt怎么办呢?如果receipt没来,那是不是我们接下来是要注册定时器开始等待了,对吧?哎,所以这接下来我们要干的是注册一个定时器开始等待。
10:05
所以呃,这里边我们其实直接就是c DX timer service,然后register一个even time timer,那当当当前本身这个事件是配啊,那我获取它的时间戳,那我等多久呢?这就要看数据了,对吧,两条流看它分别这个事件啊延迟能延迟多久啊,那所以比方说我这里边考虑这里的延迟主要是哪条流比哪条流迟呢。现在是配要等这个receipt,那是不是就是我们之前这个order log里边的这个时间戳,应该他来的比较早,我要等receipt那边的时间戳是不是要比他要晚几秒钟啊,比方说按这个来看啊,具体我就不详细去看了,我们拍脑袋给一个比方说我等五秒钟对吧?哎,那就相当于我这儿加上五。
11:00
然后自然还要再乘以1000。这就是我们当前定义的这个定时器的时间戳,啊啊,那这里边是等待五秒钟。具体要看要看数据对吧,要看两条流的那个数据的情况,然后除了这个注册定时器之外,还得干什么事儿呢?是不是还得更新状态啊,对吧?因为你当前毕竟是来了一个数据,然后要等另外一个数据嘛,另外一个数据来的时候肯定现在有状态啊,所以我们要更新的是那个现在是配来了,是不是pay做一个更新啊,Update,然后把当前的配塞进去是不是就完事了?哎,这是非常简单的一个方式啊。诶,那大家可能会想到,诶,那之前我们那个是还有一个操作,前面清空状态是要删定时器啊,你现在不删定时器可以吗。
12:00
啊,我们先这么做啊,大家先看一下,我我先不把那个定时器删掉,我们先这么这么来做,那大家想这个是process process element1,这里边直接是来了一个配事件之后就分了这么一个if else做了一个判断,那大家想如果要是receipt来了呢。这个过程是不是完全一样啊,因为两条流我们要做对照,其实它是平等的对吧,而且互相之间它要等它,它其实也要等它嘛,两边是互相可能都有一个延迟的,所以在这儿我就直接把这个处理的流程直接copy过来。直接copy,然后这里边就不一样了啊,首先大家能想到当前这个不叫订单支付事件来了,这个应该是到账时间来了,对吧。到账事件来了,那我们是不是要判断是否有已经有对应的那个支付事件啊配事件对吧,所以这里边我们是要拿那个all event里边的这个pay,我们要把这个拿出来。
13:02
所以这里边取这个状态的时候也是pa.value先把这个拿出来,接下来判断,如果配不为空。那大家想,是不是就说明当前的支付事件,支付事件已经来过了?同样现在也是配合receipt,是不是就完全匹配起来了,直接输出匹配事件清空状态,大家看我这个是不是都连改都不用改啊,对吧,直接做这样的一个匹配输出就完事了啊啊,所以这个代码还是非常的简洁的,然后else的话,那如果说这是配没来的话,配时件没来,那当前是不是我也要注册一个定时器啊,只不过我现在应该要注册定时器,就是是不是基于当前receipt这样的一个,呃,它的时间戳,比方说现在我是那个receipt log,要去等order log里边要等它几秒钟,那这个时间有可能不一样,比方说我这个等待的是三秒钟,对吧?哎,那我就给一个三,这里边就是。
14:04
等待三秒钟,具体看数据,那另外更新状态的时候也是更新的是是不是receipt state,然后把这个receipt更新进来啊。诶,所以完全对称啊,两边直接这么写就完事了。好,那接下来大家再来梳理一下我们现在的逻辑啊,现在的逻辑就是。相当于还是先要判断当前来的事件是什么,数据是什么,但大家看这个为什么里边的if else看起来非常简单呢?就跟之前我们那个create pay相比,是不是相当于省去了一重1ELSE啊,最外层的那个事件到底是什么?它是不是因为一国两制已经给我们分开了啊?只要是配它肯定是这个process element1嘛,哎,所以这个就不用判断了,所以我只要判断之前那个receipt来过没有就可以了。啊,所以按照这个过程,假如第一个来的是配,那IP没来的话,那大家想他走的流程应该是到这儿来注册定时器对吧?注册一个定时器状态更新成配,那如果接下来配又来了,呃,就是receipt又来了的话,那是不是会发现我当前配里边状态已经有东西了,那是不是就直接匹配输出清空状态啊?
15:17
哎,那大家看我这里边没有删除定时器对吧?没有删除定时器,那是不是就相当于在后边我们如果要有一个on timer这个定时器触发的话,接下来是不是就会有对应的一个,哎,就会就会还是会掉到这儿来,对吧?所以这里大家要注意啊,当前的这个逻辑定时器触发的时候,之前我们那个因为中间定时器都删掉了,所以定时器触发的时候是不是肯定是有一个没来啊。那现在这个定时句出发的时候是是什么情况,是不是有可能,有可能是有一个没来,一直等到定时出发了还没来对吧,是不是也有可能都已经来齐了,然后状态都清空了呀,但是没没删除定时器对吧?我们这边逻辑少了一块,哎,那少了一块的话能不能搞定呢?其实也能对吧,有可能是有一个事件没来,就是不匹配对吧?不匹配也有可能是都来过了,已经。
16:27
输出并清空状态,哎,那大家想一下,如果要是清空状态的话,会有一个什么特点,是不是两个都都是none啊对吧?哎,所以呢,大家就会想到,那如果要是有一个事件没来的话,这个特点是什么?是不是有一个为囊,另外一个一定不为囊啊,哎,所以现在我们有这个判断逻辑啊,是不是就只要判断哪个不为空,是不是就相当于是另外一个没来啊,如果两个都为空,那是不是说明都来过了,已经清空了,那就什么都不用管了,对吧?哎,所以接下来我们的处理逻辑是判断哪个不为空,不为空那么另一个就没来,所以我们的判断逻辑是这样啊啊,这个我们就相当于利用这个,呃,逻辑上的这个组合,相当于省了一个那个定时器的删除,因为我们删除定时器是不是还得多保存一个状态啊,还得保存那个定时定时器时间戳嘛,所以现在就相当于把这一步省了,这里边的判断就是如果past sit.value拿到它的值,如果它不等于nu的话,它不为空的话,那是不是说明。
17:47
Shift没来啊,大家想想是不是这样,所以我测输出流输出一个报警output,当前应该是on match的pace对吧,因为只有配嘛,只有这个order event对吧?哎,那么这里边输出这个啊,把当前的pay it.value输出是不是就可以了?哎,那另外大家想我这都不用else if,因为直接判断就行,对吧,只因为他俩有没有可能它俩都不为空呢。
18:15
都不为空,不可能只要都不为空,是不是这里面就一定清空了呀,对吧?啊,前面就一定触发已经输出了,所以这里边他们肯定不可能都不为空,所以我就只判断它某一个是否为空就可以了。诶,这里边点value,然后不等于now,如果这个不等于now的话,那是不是ctx output,这是一个on matched receipt,然后receipt state.value对吧?因为它俩不可能同时都不为空,所以我们不可能同时输出这两个测数流,对不对啊,这俩是只能选一个,然后另外还有一种情形是它俩都为空,都为空的话,是不是他俩就都不执行啊,哎,所以这个代码其实很简单啊,那最后是不是还应该有一部。
19:01
还应该有一部,是不是清空状态啊啊,所以。最后一步清空状态,既然已经都输出了,最后K掉就完成了,这就是我们完整的一个处理流程,对吧。好,那接下来我们还是运行一下做一个测试,看一看这个运行的结果效果怎么样。好,现在已经得到了输出结果,大家看一下哦,这里面大家看到这个matched pace大部分都是match的,对吧,然后这里边on matched pace里边有这么几条,On matched PA,这里边有一个34731,那大家知道它应该是在哪里出现过。是不是应该在这个order pay这里边出现过,大家看有一个create,有一个配对吧,呃,这个order ID是这个对吧,那大家想到是不是应该对应的这个交易码应该是。
20:00
是不是在receipt里边就没出现过呀,所以我们再查一下这个receipt果然没有对吧?哦,那另外我们再看还有这个on match的receipt,那比方说这里边这个交易码,我们看一眼这个当前啊,它确实有用微信支付的对吧?啊,30845这个时间戳给的啊,那这里边有没有呢?查一下果然没有对吧?哎,所以这个如果是只出现在一条流里边,另外一条流里边没有的数据,我们这里边就会直接输出一个报警对吧?测殊由会做一个报警,这就是利用这个flink里边的connect连接操作做实时对账的一个过程。
我来说两句