00:00
那我们再来实现另外一个需求,就是所谓的来自两条流的订单交易匹配啊,那其实这个需求主要说的就是一个实时对账的需求了啊,那大家会想到我们现在一般情况,呃,我们在这个平台里边做支付,都是用了第三方的支付平台,大家也看到了我们当前的这个支付请求里边。这里边这个订单数据里边支付之后是会有一个交易码的,这其实就是第三方支付平台给我们确认支付成功之后会返回的一个交易码,对吧?啊,那但是一般情况呢,我们收到这个就认为这笔钱已经到账了,但它并不是真正意义上平台账户已经到账,我们如果想确认的话,那应该还应该有另外的啊一个进程,我们去查询当前平台账户啊,这个数据到底有哪些真实发生的交易对吧?把那个数据提取出来,如果跟这里的订单支付能够匹配起来的话,这才是一个完整交易确认的过程啊,所以这就相当于是一个实时对账了,所以接下来我们在代码里边想要做这样的一个实时对账,那就需要。
01:11
需要处理两条流的数据了,那大家想一下,我们怎么样处理两条流的数据呢?有一个非常简单的方式,就是之前讲过可以connect,对吧?两条流连接在一起,后面我们讲不是一国两制吗?诶,那这个时候我们两条流的数据结构本来可以不一样,那连接到一起之后,各自该怎么样做,自己去定义就完事了,对吧?所以这是最容易想到的一种方式,接下来我们就在代码里面做一个实现。呃,首先我们现在两条流的数据,呃,测试数据的话,我们现在是不是只有一条流啊,只有一个订单支付的这个流对吧?哎,那另外我们应该还得有一个到账事件的流,所以在数据文件里边,我们还有另外一个CSV文件,叫做receipt log.csv。Receip,大家知道是有那个收据的意思对吧?啊,所以这就是应该是另外的呃,一个进程,我们检测到的一个流逝的收到每一笔交易到账的那个信息啊,大家看一下这个还是ETL之后的CSV文件逗号分割,这里边的字段有主要有三个,那大家看一下这三个字段应该分别代表什么含义?
02:23
这边也非常简单,是不是主要就是前面这明显这个奇奇怪怪的这么一串字符串交易码对吧,然后后边。诶,是不是就是当前的支付方式啊呃,另外最后还有一个时间戳,其实大家知道一笔到账信息的话,显然不止这些这些数据,那到账信息是不是至少你应该有一个金额信息啊啊,我们这里边是因为不做那么复杂,不做那个金额的比对了,对吧,我们直接就是以什么为准呢。是不是就是以这个交易码为准就可以了,只要那边诶收到一个交易成功我们订单啊,支付order pay这里边收到一个交易码,那然后如果说我在这个到账数据里边也有对应的交易码的话,那这个是不是就完全没问题了,对吧?我就认为这笔账是已经平了的啊,对账是没有问题的,那如果说这两者中间有一个没有,大家想是不是都有可能出现,都有可能出现问题啊,啊如果说这个order pay这个交易码已经有了,但是IP log这里边没收到的话,那有可能就是啊,当然这个是有可能是第三方支付平台出现问题了,他告诉我们转账成功,但是没有没有把这个钱加进来,对吧?啊,这种情况还比较少见,现在的第三方支付平台大厂还是比较稳定的,那另外一种情况呢,就是我们收到了这笔账,收收据啊,Receip log这里边是有的,到账是已经到了,但是是不是有可能我自己的日志里边订单的那个。
03:53
啊,请求有可能丢了呀,啊,当时的那个订单太多对吧,我们返回的这个数据没收到丢了,你像前面我们那个create是不是也有可能丢啊啊,这都是有可能的啊啊所以这里边我们就要分两种不同的情况去做这个呃判断了,所以接下来我们在代码里边还是先把这个病字下边写一个。
04:15
Po类啊,类似于re病的一个一个po类,大家想我们之前有一个order event,那现在是不是就应该对应的有一个到账的那个事件啊,Receipt event对吧,把这个列出来receipt event。然后接下来里边的字段主要就是有三个,我们也看到了,首先string类型的一个交易码transaction ID,然后接下来是啊,还是string类型的,有一个支付通道,支付渠道,对吧?我们直接叫pay channel,当然pay channel我们可能具体在对账的过程当中并不关心,对吧?就是有一些如果说要想要做一些筛选条件的话,那可能再用这个去做一个筛选就完事了。最后还有一个time Sam时间戳啊,同样还是把对应的空参构造器和带参数的构造器直接都生成出来,对应的还有get set自动生成。
05:19
最后还有一个to string方法。啊,这就是我们当前创建好的输入数据,另外一条流的对应的那个po类型啊,那接下来我们就是可以写主体的这个代码了,当前我们就在order pay detect下边还是这个模块下边新建一个class,这个就叫做transaction pay,呃,Match对吧,做这样的一个匹配啊,就是是否当前的这个交易正常的匹配,两条流能够对账对起来啊,那这里边首先我们还是把这个主方法写出来,Throw一个exception。
06:01
呃,大家想接下来的这个主体流程前边是不是一样啊,就首先应该跟我们那个order pay timeout这里边,呃,读取这个order的数据是不是一样啊,还是把它读进来转换成呃,对应的这个order event po类型,然后ign times sample and watermarks这个过程一模一样,对吧?所以我直接把这个就copy过来。好,把这个先做一个提取,然后这里边我们可以稍微的改一下这个resource,因为大家想到我接下来应该有两个文件,两个resource,对吧,我把这个叫做all resource。那下边这里边我们就用这个all the resource去做一个提取,然后与之对应的下边是不是应该还应该有这个。对应的啊,应该有那个receipt事件里边的所有的数据也要读出来,读成一条流,转换成对应的po类型,对不对啊,所以同样的应该有我还是定义啊当前的这个类型,我们把这个类也是用自己的这个类型啊,Transaction pay match对吧。
07:11
然后接下来这里,呃,同样是transaction pay match class,然后get resource。接下来的这一个是receipt log,把这个相对路径传进传进去。好,这个resource我们可以叫做receipt resource。然后接下来,那当然就是env read text file,直接用IP resource去get pass,这当前读出来的应该是一个string类型的数据流,对吧?然后接下来是不是同样是做一个map转换啊,那这个流程基本上大同小异啊,所以还是来一个拉姆达表达式,把它做一个处理,同,呃,首先CSV文件把它按照逗号切分,切分成。
08:01
String类型的数组,对吧?Fields line去做一个split逗号分割,后面return new一个,我们想要的是receipt event,对吧?里面的字段啊,大家想,前面首先是那个传家成ID嘛,String类型FIELDS0对吧,后面那个page channel也是FIELDS1,直接拿过来string类型就可以了,另外还有一个长整型的时间戳FIELDS2。这就是当前我们做啊转换啊,生成这样的一个po类型的一个过程,那后边是不是还得assign他watermarks啊,这个大家要注意两条流,那是不是我们应该分别读取数据源之后都要去生成对应的water mark呀。大家想想是不是这样,因为呃,有同学可能想,那你这样的话,后面我们不是还要合流吗?对吧?两条流合在一起之后,如果说我这边没有auto,那是不是我就只以这边的water mark为准了。
09:06
接下来是不是就只有这边的watermark在在往下游传递啊,那看起来好像也没问题,但是大家想是不是相当于我下面这条流里边的数据的那个时间就没法控制了呀。对吧,相当于我对于这两条流的那个时间,你要做匹配的话就对不上了嘛,所以这里边必须是两条流都各自提取时间戳,去分配他们的water mark,对吧?然后接下来大家想河流之后的那个water mark是什么规则呢?是不是又来了,又是我们那个类似于之前那个分区watermark的那个原则,对吧,取他们最小的作为当前下游任务的,当前的这个事件始终好,所以接下来大家看一下这个,那具体来讲的话,那得看数据了,对吧,到底是生序还是乱序,我们看一眼。454748啊,这个整体看还是升序,所以还是直接来一个,你有一个a sending time step structure,这里边要提取的还是element里边的time stamp,大家也看到了,是秒对吧?所以这里边直接给一个乘以1000就可以了。
10:10
好,到这一步这里我们就得到了对应的这个到账事件,对吧,我们这里边是读取到账事件数据,上面这里是读取订单支付事件数据,这里边还缺了一步,大家想一下,就是我们这里边完整的读这个order event啊,就是读这个order log里边的数据的话,是所有的数据拿出来都能用吗?其实也不是,因为这里边是不是大家看到有一些是下单的那个数据啊,下单的数据跟我们两条流去做匹配,做对账,是不是一点关系都没有,我们要的就是那个支付数据对吧?哎,所以这里边关键在于是不是必须得是那个配事件啊,所以我们在前边这里在定义当前的这个,呃,Po类型,已经map成这个po类型之后,接下来是不是应该提前做一个filter啊。
11:12
只要选取那个支付数据是不是就可以了,对吧?诶所以诶这里啊。Filter filter这里边我们提取的是当前啊,就是可以取这个。呃,就是当前我们的那个事件类型是是支付是那个paid的,或者我也可以直接要求,就是它有那个传单ID是不是就可以了,因为大家想后边我这两条流要和连接在一起的话,是不是它应该有一个连接的标准啊,就相当于我们那个表做状语的时候得有一个连接条件一样,对吧?啊,你说我这里是两条里边所有的数据都去做一个连接匹配吗?当然应该是同一笔交易,同一个交易码去做一个连接匹配,对吧?哎,所以这里面我们其实必须要让那个交易码transaction ID不为空,所以大家想这里边我的条件是不是就是非取一个非就是空字符串equal字data点。
12:16
当前的transaction ID是不是这样去写啊,对吧?呃,所以这就是我们这这个就是交易。交易ID不为空,其实也就是说必须是配事件对吧,必须是支付事件,这才有意义啊,这就是我们前面要做的这个操作,然后接下来已经把这两条流都已经读进来了,那接下来其实就是两条流是不是要做连接。连接在一起河流,然后接下来做匹配了是吧,我们接下来是将两条流进行连接合并,然后呃。
13:06
处进行匹配处理,所以这里边其实我们首先是all the event stream,那大家想一下,接下来我是不是应该要去connect呀?啊,所以这里边有两种方式啊,大家应该还记得我可以先去connect connect,另外一条流就是这里边我们的receipt,这里我可以给一个。大家看这里面我可以给个名称对吧,刚才没有给名称啊,这个我就叫做receipt。对吧。好,这是我们的这个两条流received event stream可以直接连接这个得到的其实是一个connected streams,然后后边其实大家看到在这个里边我能不能指定它的连接条件呢?可以的,里边是不是我还可以去K啊?
14:04
大家看到是质这样对吧?KBY这里面是不是就要指定两个position位置,或者两个大家看string类型的字段对吧?啊,或者是后边我是不是可以指定两个k key select啊,可以这样去指定,这样的话就相当于就有我们的这个连接条件了,对吧?啊,当然其实大家会发现它的底层其实是干了一件什么事呢。其实还是就是把我们的每一条流是不是先做了一个KBY,然后再把它合并到一个connected streams里了啊,所以这里边还有另外一种实现方式啊,就不需要先连接起来再K也可以,怎么样呢?对,先KBY,然后再去连接,自然它就是按照这个K去做分组的,对吧?啊,然后这两条流合并在一起之后,也是对应的这个K的,呃,所有的数据,那大家想一下,现在我们的这个K应该是什么?
15:00
按照什么KB是不是就是那个交易码啊,对吧,所以这里面我们首先要的是大家看不是order ID,我要的是transaction ID对吧?啊基于这个啊,给一个方法引用传进来,然后后边。接下来才是点connect去做这样一个连接,连接的话连,那如果说这里边我已经是KPI之后的这个操作啊,那是不是连接我里边的对应的这一个也要指定它的K啊,所以我应该是一个k stream,再去连接另外的一个,是不是连接另外的一个k stream呀,哎,这里大家可以看一下啊,就当前我这里边去connect的时候,如果说你直接传一个data stream传进来的话,那它只是把这个两条流做一个连接,那后边连接的这个。条件到底是什么呢?就是我们当前两条流里边指定的K,所以你在后边指定也可以,在前边直接KY指定也可以对吧?哎,所以在这儿我们就连接的时候直接就把它KY出来,那当前这个receipt里面不也是一样吗?来一个receipt event get transction ID对吧,这样就把它连接好了。
16:12
得到的这是一个connected streams。然后接下来做什么操作,要处理它怎么操作,大家还记得我我我们是不是当时就讲过可以map里边要传一个com map function一国两制对吧?或者flat map是不是传一个扣,Flat map function一国两制啊,哎,这个都是一样的啊,那我们现在直接map Co map扣能够搞定吗?哎,这个就涉及到一个问题,大家想我这个一国两制的时候,不是说只对当前的这个数据做处理就完了,是不是还得判断它的另外一条流里边的那个数据来没来啊,所以接下来就类似于我们之前那个create和配,有了create之后要等配,有了配之后要等create,对吧?那我们现在是不是也是现在我们是一个配一个。
17:02
Receipt一个到账数据,所以就是如果一条流里面的支付配来了,我要去等待receipt,那receipt来了,是不是同样也要等那个配啊,那这里面就涉及到一个问题,我等的话等多久呢?不能无限等下去,还是那个原因,如果无限等下去,相当于我们的状态一直不清空,一直在等待,对吧?啊,这个就是肯定在性能来讲是不能这么去设计程序的啊,所以我们这里边要设置一个等待的时间,当然这个时间的话,具体来讲,那其实还是应该就是大家在这个呃,当当前啊,具体来讲的话,大家其实就是要到这个数据里边去看两条流里边的数据到底是一个什么样子了,对吧?啊,就比方说我们这个receipt这里边的这个数据这个事件啊,然后跟这边的order order pay的这个事件,呃,这这个是order啊,然后跟这个reip的这个事件,大家看他们俩的这个延迟,到底谁比谁能延迟多少,对吧,然后就看他们俩到底要等多少多长时间啊,那他想这个时间是不是相当于没准啊,两边应该都得等对吧?啊,就是他们互相之间的这个,因为本来就。
18:17
不是两条流嘛,这个时间其实就不存在,我们说这个本来是按照顺序来的啊,它本来就应该是乱,就有可能先,有可能后,所以接下来既然是要等待,那我们想想应该怎么等,是不是?最简单的方式就是直接去定义一个定时器,然后去隔几秒之后出发就完事了,该等几秒等几秒,对吧,那要定义注册这个定时事件的话,是不是就只有。前面我们讲的这里边这个connected stream能够调的那个API啊,这个map和flat map能能够去定义注册这个定时事件吗?这也不能,我们是不是就只有。Process了啊,之前没给大家讲这个,那现在刚好又把这个process也用起来了啊,啊,那这个process里面大家看要传的就是一个Co process function对吧,它也是process function式大家族里面的一员啊,也是一个底层的API,后面大家可以看看,看一看这个东西到底怎么样去做啊,那这里我还是先把它声明出来,自定义一个,我把它叫做transaction pay match detect对吧。
19:25
检测这样的一个操作啊,那最终我应该能够得到一个结果,我把这个就叫做一个result stream,那大家自然想到了,就是我最终检测应该是检测到什么呢?是正常匹配的事件对不对,那如果要是不匹配的事件呢,只有一个,只有一个配,没有另外的,那大家想是不是应该给测输出流输出啊,输出一个报警对吧,还是分开,所以这里面就涉及到。进行匹配处理对吧?如果不匹配的视线是不是应该输出到侧输出流啊,那这就涉及到侧输出流,我们是不是又涉及到那个测出流标签了啊,我还是放在上面啊,直接定义这个定义测输出流标签,哎,那么这个测输流标签的话,大家可以我们可以直接定义成两个,因为大家想两条流嘛,某一个来了,另外一个没来,可能这是两种不同的情况,我们可以多定义几个啊,就相当于输出到不同的流,大家想测出流我们没要求说只能有一个对吧,有多个的话,是不是用不同的标签就可以表示啊,哎,所以这里边我们可以定义两个啊,首先定义一个,这个还是final static,然后给一个output tag。
20:51
首先呃,我们应该是来了一个order pay的事件,但是没有找到对应的到账事件,那这个数据,呃,大家想这是这是不是相当于就是一个没有匹配成功的配事件啊,对吧?呃,我们就定义这样的一个order,那里边数据存什么呢?那自然我就把当前的那个order even存进来就完了,对吧,它没有找到对应的那个receipt嘛,我就只把它孤零零的单独存起来完事,所以我把这个叫做on matcht pace没有匹配起来的配饰件,所以这里边你有一个output tag,然后里边给一个名称叫on。
21:32
Match matched pace,然后另外加一个画括号对吧?啊,那自然我们想到了,基本上非常的类似啊,同样也可以定义一个UN matchched,是不是receip呀,这里边给一个receipt,同样这里边就是只有一个receipt事件来了,但是没有对应的order pay那个事件进来,我就把它保存在这一个流里边,测殊入流对吧?未匹配的到账事件类型改过来,Receipt event,这里面标签也要更改过来,Receipt。
22:12
好,这就是我们这个特殊流标签的定义,那下边要打印的时候,是不是就相当于我们有三个结果都可以打印对吧?首先主流result STEM直接打印输出的,这应该是是不是正常匹配出来的呀,对吧,Matched pace,这是正常正常输出的一个匹配流,然后侧输出流里边get set output啊,首先我们有on matched pace。这个做一个打印输出,这是UN matched pace。同样,Result,呃,Result stream,结果get set output,测出出流,取这个on receipt,也可以有一个对应的测出流,打印输出on matcht receipt。
23:06
哎,这就是我们这个完整的一个处理流程,后边env execute执行起来,当前这个叫做transaction match job。这就是主体流程。
我来说两句