00:00
接下来我们再来实现最后的一个需求,那就是要做一个订单到账的实时的对账,实时的匹配,哎,那这其实主要说的就是对于这个订单支付事现而呃而言啊,我们用户那边做了一个支付操作之后,其实还不算真正的这笔账已经打到这个平台了啊,大家想到就是真正如果到账,其实你需要到平台账户上上去确认,对吧?这其实完整来讲的话,它也是需要有一个事务性保证的,就是这边呃,我们给他平台上这个钱充,充值进来之后,呃,如果是充值操作的话啊,充值进来之后我要给他加上,然后这边呃,平台账户应该是真正到账了,这个才算才算数,或者说如果他那边去做了一个这个呃,就是交易行为,如果我们通知这个商家那边要发货的话,那我们最起码得保证这个平台账户收到这笔钱,然后可以给那个商家去打款,对吧,要不然的话,这个是匹。
01:00
配不上的,最后这个对账就会出现问题,那之前我们一般情况对这个是考虑的会比较少,因为大家可能会认为这是属于一个就是金融上面做对账,最后做核对啊,做核算的一个工作,那往往呢,就会放在呃,就是之后我们做一个批处理,做一个离线的对账,或者说呃,就是大家可能会想到,就当前一般情况下,我们这个做交易的时候都是基于第三方支付平台,对吧,现在第三方支付平台已经越来越稳定了,出现这种诶用户那边如果发生呃,发生了这笔交易,然后呢,第三方支付平台没给我们到账,这种情况其实很少的啊,那大家可能就觉得这种情况一般不用处理也也没问题,就是我们如果信任充分信任第三方支付平台的话,这是没问题的,但是另外还有一种情况就是假如那边第三方支付平台已经到账了,但是我们这边丢掉了他的那个返回信息,对吧,我们认为用户这边没充上,这也是一个问题,所以其实我们还是要,如果要是。
02:00
出于这种检测的风控啊,出于一个更高标准的风控要求的话,还是要把两条流的数据做一个比较,做一个对照啊,那接下来我们就来做这件事情,那我们具体用什么思路来做呢?现在就相当于应该有两条数据流对吧?一条数据流就是前面我们看到的用户的订单事件啊,除了这个下单的事件,还有支付的事件,我们现在最关心的就是支付,那就是如果用户配了啊,做了一个这个配置操作,接下来我们把它提取出来,然后另外还需要有一个什么呢?呃,那可能是另外的一个进程,或者说我们另外要起一个这样的一个轮询任务,不停的要去查平台账户的一个到账信息,那么这个信息查出来之后就应该是另外一条流,对吧?呃,这两条流我们就可以做一个匹配,所以接下来我们还是在代码里边啊,实现一下这个需求,呃,当前我们就还是放在order pay detect下边去。
03:00
新建一个object,当前的这个object,呃,我们名字就叫做这个是跟交易相关啊,我们就叫做transaction match吧,匹配啊交易匹配大家知道,就是一个实时对账啊,然后然后我们现在首先大家考察一下我们现在的这个数据,呃,首先要有一个这个orderlo,对吧?这个我们已经引入了,就是要有订单的事现流,另外还需要有一个就是对账的时候查询我们平台账户到账信息的一个流,对吧?诶这个我们也是有一个测试数据已经引入进来,叫做receipt log,大家看一下这里边主要是一些一些什么样的数据结构啊,啊大家看还是CSV文件逗号分割三个字段,第一个字段大家看这个很诡异,乱七八糟的这样一个数,这是什么呢?诶,这不就是当时我们这里边后面不是有这样的一个数吗?对吧?哎,这就是我们说的那个订单的交易ID对吧?啊就是。
04:00
大家可以认为那个流水ID啊,我们拿到的那个那个ID,然后对应的我们在到账信息那边是不是也有应该有一个同样的交易ID啊,对吧,两边这个应该是同一个啊,所以大家看这个到账信息里边首先是一个交易ID,然后后边还有一个啊WeChat,阿里pay,大家知道这是第三方支付平台的那个字段,对吧,到底是哪个支付渠道?呃,然后另外还有一个就是时间戳了啊,当然正常来讲还应该有一些其他的信息,比方说呃,支付的金额对吧,如果说我们想要去关注的话,应该把它也要提取出来,现在我们只只关注就是它的那个ID匹配上没有,这里边就没考虑这个金额之类的东西了啊,如果我们想要做更深入的这个呃校对的话,那还需要把那些数据也提炼出来啊,所以接下来我们在这个代码里边trans match里边,是不是前面还是得把这个样例类定义出来啊,啊,这里边我们定义一下这个order event的话,前面已经定义出来了。
05:00
现在我们关键就是要定义啊,就是到账到账事件样例类,我们现在两条流嘛,一条流已经有了,现在是另外一条流re receipt,我们把这个叫做receipt event这里边的呃,数据的类型啊,里边transaction ID,这是一个string类型,然后后边有一个支付渠道,我们叫做pay channel,这是一个也是一个思命类型,最后还有一个time stamp长整型时间戳啊,把它放在这里啊啊另外我们输出什么样的数据类型呢?诶,那大家会想到现在我们如果要是说检测他俩是否匹配的话,是不是就应该是有一个呃,有一个订单ID对吧,有一个transaction ID,然后呢?呃,如果既有他的那个order order的那个paid的事件,又有当前的这个receipt的这个事件,那就相当于这个就匹配上了,对吧。
06:00
啊,那我最后也不用输出,就是定义那个输出的样例类了,我干脆怎么样就把这两个事件拼到一起,拼一个二元组给你看不就完了吗?对吧?啊,那大家会想到,那假如说这两条流里边不匹配,就没有匹配上,那怎么办呢?那也简单,比方说如果来了那个支付,来了那个pay的事件啊,Order pay的事件没有来receipt事件的话,我就把那个order pay单独给你看,对吧?呃,就他来了单独来了receipt没来,然后如果要是说来了receipt没来那个order k的话,我也把它单独拿出来给你看,对吧?哎,所以这个其实输出的这个数据类型我们就不单独定义了,这个就大家知道是怎么回事就好了,接下来我们来定义。接下来主体主体的这个流程啊,那我们会想到跟之前应该差不多是吧,啊,其实至少我们是要从那个订单流里边去读数据的嘛,哎,所以我还是啊之前这个order timeout,先把前面前面这个环境我还是该定义都定义出来,然后这里边我先从这个呃,就是当前文件里面去读取数据,然后把它转换成样例类,然后再分配时间戳和watermark。
07:12
接下来就在这个代码里边,把这个先通通引入,呃,然后在这个处理的过程当中,我们把这个transaction啊这个引入。呃,然后上边大家看这里还有一个这个get class啊,这里边还在报一个错,我们看一下到底是哪里有问题。哦,后边这个case写错了是吧?Case class啊,然后接下来后边的这个流程里边,那就不需要引入别的东西了啊呃呃,然后我们现在是读取这个当前的这个事件流啊,这是order,大家看一下这是这是什么?从文件里读取,我们直接就说读取啊订单事件数据对吧?订单事件数据,然后同样的接下来是不是还要再做一个读取收据,就是到账事件对吧?读取到账事件数据这流程是不是跟上面差不多啊哎,对吧,这个我干脆就直接copy过来算了啊,代码都是一样的抄,只不过这里边我们稍微要改一改,这里边就不叫order log了,而是要叫IP log,然后后边你前面那个如果叫呃resource的话,我们这个改一下,这个叫RESOURCE1,这个叫RESOURCE2。
08:34
后边我们这里边也直接用RESOURCE2去get past,那后面这个不要叫order event stream对吧?啊,这个还是改一下叫receipt event stream,然后这里边我们去做包装的时候呢,呃,也要把它改成一个receipt event,然后里边啊,那这个字段就不一样了啊,首先第一个传达成ID还是个string对吧?第二个那个channel也是一个string,哎,这个就不用说了,最后还有一个对这个图long,第三个字段图long就完事了,然后我们看一下后面这个分配时间戳,那还关键是得看它到底是升序还是乱序对吧?啊,这里边我们看了一下,整体来讲还是升序基本上是排好序的啊,所以因为乱序我们之前已经比较熟悉了啊,所以这里边不再去强调这个乱序的处理了啊,所以我们就直接用升序的处理,Time s乘以1000,大家知道默认延迟一毫秒啊,有这样的一个啊定义就可以了,然后接下来我们在做这个处理的时候,大家就会想到接下来要干什么。
09:35
这呢,哎,接下来那就得把当前的两条流要去合并做处理了,对吧,而在这个合并的时候,这里边要做一个河流操作啊,合并两条流进行处理,那现在河流怎么合呢?之前我们讲过这个河流的操作有connect,有UN,我们到底用什么呢?
10:01
之前我们说过,两种河流方式的主要区别在于,一个是一国两制,就是可以是不同类型的流里边的数据,然后合在一起,然后我分别去做对应的操作,对吧?然后另外一种呢,是里边的数据类型必须一样才能合在一起,大家想想现在两条流里边的数据结构一样吗?当然不一样啦,你一个是receipt,一个是order嘛,对吧,当然不一样,所以现在我们要用的是connect,对啊,这里边我们就定义一个啊,比方说这个我们定义直接就定义result stream了啊,不做中间的那个转换了,那就基于之前的all event stream,要去connect,去连接另外的一条流,也就是我们这里边的receipt even stream,对吧?然后这个连接的过程当中,大家会发现啊,在做这个连接操作得到connected的stream之后,大家知道现接下来我们要做的操作其实是针对什么的呢?难道说这两条流里面的数据你直接。
11:02
它连在一起之后,所有数据都混在一起做操作吗?我们其实还是要捡取同一个ID的数据,对吧,对吧,同一个transaction ID这个才有效嘛,所以大家看到接下来可以做什么操作,可以做一个KBY对吧,或者我前面也可以怎么样呢?这里给大家介绍另外一种情况啊,就是我可以在前边就把当前的这个K先都定义好,我先定义成当前的这个KBY,应该是trans transaction ID对吧?啊,先按照这个做一个分组,然后下边的这一个receipt里边,同样它也是KBY,按照当前的transaction ID做一个分组,然后大家看现在我这个调用的时候,这个相当于还是做了一个data stream API里边啊,就是一个data stream API调另外一个data stream啊,调它的connect的方法,传入另外一个data stream,得到一个connected streams对吧?那这里边本身是可以做这样的操作的,然后接下来我做的操作呢,就默认都会基于同一个K里边的数据,然后把它们去做匹配,去做定义啊,所以接下来这个就方便多了,对吧,啊,你也可以是两个先合在一起之后,然后再去K败啊,这个一般情况我们可能是先K败,因为它两。
12:24
它俩的这个字段有可能不一样,对吧?啊,就是这里边我们就是你先去分别去KY之后,这个可能会更加的方便一些,好然后接下来我们通过这个连接之后呢,后面做什么操作,那就是因为大家想到接下来是不是还涉及到一个有可能会需要去等的一个过程啊,啊对吧,那我们想到如果说先来了一个一个那边的那个订单的一个支付的事件,一个order pay,然后呢,我们要去等这个receipt,那到底等多久呢?这是不是就得看两条流里边的数据,大概是他们这个延迟有多久,对吧?啊,你就得大概的判断一个这个了,那这个具体实现怎么实现,肯定还是一个定时器嘛,对吧,设置一个定时器等一会儿时间就可以了,所以接下来既然用到了定时器,当然就要用process function了,对,这里边就是process里边定义一个自定义的啊,比方说这个我们就叫transaction。
13:24
Match result的一个处理,定自定义这样的一个处理函数,那最后我们做一个呃,Result stream去做一个打印输出,最后不要忘记还有execute把它执行起来,当前我们这个叫transaction match job,这就是我们整个程序的处理流程。
我来说两句