00:00
接下来我们做最后一个模块,最后一个需求是什么呢?还是跟这个订单支付相关,那大家可能知道,对于我们这个平台而言,对于一个订单支付事件而言,其实用户那边支付完成还不算完啊,真正如果我们要从安全性的角度考虑的话,我们这里边的账户真正入账是得看,呃,就是本身,比方说你是三方,第三方平台支付的话,你得看微信钱包对吧,得看支付宝账户到底里边是不是有这样一笔收入,有这样一个一个支付的信息,所以对于我们在在有一些场景下面啊,之前也给大家举过例子,就是说比方说像在这个直播平台要快速两两边的这种情况,你充值有可能出现丢失的时候,充值的这个数据有可能丢掉的时候,或者是在另外的一些场景,就是我们及时要核对自己的账户。
01:00
户要保证那边充值即刻这边到账,有可能就要花的这种场景的时候,那这个时候可能我们需要做一个实时的对账了,呃,对于传统而言,我们大家习惯的可能是啊,就是这边支付嘛,第三方平台给我们一个返回,我知道他已经这个交易已经成功,那就完事儿了嘛,至于那边到账不到账,因为传统这个还有中间有一个银行的环节,那银行可能相对来讲这个过程比较慢,我们大家默认好像这个对账到账的这个过程可以比较慢,甚至是可以到下一个工作日啊,甚至是好几天之后,呃,但是大家想我们真的没有这个实时对账的需求吗?其实对于用户而言肯定是越快越好,或者对于我们平台而言也是越快越好,对不对,我及时的检测出来一些异常的状况,如果发现两边匹配不上的话,那其实这个时候就应该做一些操作,至少是你要做一个报警了。所以我们现在其实是要同时处理两条流。
02:03
大家想一想,我们现在应该是有哪两条流呢?一条流就还是这个订单支付,呃,就是我们从那个日志买点日志里边获取出来的数据,大家发现如果那边做了支付的话,其实调取第三方平台这个接口,能够得到他的一个返回,对不对?呃,一般情况是能拿到他的那个交易ID的,然后接下来我们对账的时候去怎么操作呢?是不是要用另外一个任务,或者说我们另外起一个服务去查询这个账户里边发生的那些交易啊,哎,那边如果真正发生过这些交易,两边交易ID1匹配,我们就可以确认这笔真的到账了。好,所以接下来我们其实要做一个两条流的连接,或者说联合处理,那大家想一想,这个两条流的连接联合处理可以用什么方法呢?之前我们讲过对多流转换算子,对吧?对于多条流进行连接和转换,有一个算子叫connect,对,所以接下来这一部分代码我们就用connect,然后大家还记得connect完了之后还得做另外一个操作叫。
03:17
对,Co map或者Co flat map对吧?啊,把它再做一个转换,得到最后的data strip啊,那我们这个过程呢,可能不不是简单的做一个map就能搞定的,那我们可以做什么更复杂的操作呢?用什么算子去做更复杂操作呢?哎,这就是又提到process方式,所以我们可以基于一个connect之后的connected stream调取它的process方法,我们要实现的就是一个Co process方式啊,所以接下来给大家讲这部分内容代码里边我们还是直接就在这个模块下边去创建吧。呃,我们这个就叫做transaction match。
04:06
Object,对于这个,呃,对于这个需求而言,大家会想到除了前边我们的数据来源有一个是这个orderlo,那应该还得有一个,就是我们另外一个任务去去查询整个当前交易,交易完成的交易列表的那个任务,对不对?它的数据也得是另外一条流,所以这里边大家看到在数据里边会有一个叫做receip log啊,这相当于是我们那个收据对吧,IP收据的意思,所以可以用另外一个一个任务把它提取出来,所以我们先把它同样放在resources下边。哦,大家看到这里边它的字段是有几个重要的,主要是哪几个字段呢。啊,首先第一个是不是就是我们那个交易ID啊,对吧,发生的这个交易ID,然后啊,那这里面还有一个支付渠道,大家看到根据这个其实我们也可以统计不同的这个渠道,然后完成的交易,对吧?正常来讲是不是后面应该还有金额啊,我们这里边因为只是做对账,这里边没有考虑金额的话,我们只判断交易ID是不是就可以了啊,所以大家如果要是还要统计跟这个金额啊收支呃这方面相关的一些内容的话,我们还应该把那个金额算上,这里边就是一个渠道,最后还有一个时间戳,所以就是这么三个简单字段,所以在我们当前啊,这个这里边也不需要再去添加什么其他的依赖了,对吧?呃,只要有这个connect就可以,所以在这里我们先把这个样例类创建出来。
05:48
那首先。两条流的输入,一个输入就是就是order event,那我们在其他文件里边都已经定义过了,这个就不再重复定义了,我们这里面主要是要把这个就是接收的这条流对不对?呃,这个叫接收流。
06:11
数据事件的样例类把它叫做re event前面第一个是一个transaction ID,这是一个string类型。后边还有一个一个支付的,呃,就是方式对吧,就是支付的渠道,我们叫pay channel是微信还是支付宝。另外最后还有一个事件时间even time,这是一个long类型的时间戳啊,这就是我们一开始要做的一些准备,接下来这个代码里边啊,大家会想到这个其实整体来讲差不多对吧?啊,那这里面我们还是过一遍吧,Env先创建流式处理的执行环境。
07:10
呃,然后同样这里边还是可以先设置,比方说这个并行度先设好,方便结果输出,然后这里边再去设置时间语义。诶,这里边是前面这个引错了吗?我们看一眼啊。Characteristic这里啊。我们使用英文time作为我们的时间语义,然后接下来哦,大家会想到就是定义我们的输入流对不对?呃,这里边我们有两条输入流,首先一条是是不是order event啊,那这个我们直接可以从这里照抄,对吧?啊,这个其实没什么没什么问题的,直接从orderlo里边把数据读取出来,然后把接下来的这一个呃,数据转换成一个order event类型,然后接下来分配时间戳,然后还要KBY啊,整个这个流程直接抄过来就好。
08:18
呃,这里边本身是一个这个流的处理啊,我们先把它打开,先用这个文件读入,然后做处理,除了这一部分之外,这里边我们读取的是订单流,对吧,订单事件流。在这里大家注意我们已经把它读进来之后,呃,我们要的事件是什么呢?因为订单事件流里边大家还记得那个数据有create有配对吧,还有一些别的东西,我们这里边主要要的是什么啊?对,主要是要这个有支付信息的那些事件,对不对啊,这一部分我们其实直接做一个filter是不是就可以啊?另外后边大家注意,我如果接下来在KBY的时候,我是要针对每一个订单去做k buy吗?
09:13
大家想一想,后续我如果要跟另外一条流去做比对的话,对,其实要根据交易ID去做比对,对不对?所以这里边我们其实是想要用transaction ID,而且前边要去做一个filter操作啊,这里filter比较简单,我就直接可以要求这个transaction ID不等于空就可以了。当然大家也可以写一个,就是要求那个支付的类,就是order的类型行为必须是配对吧啊,当然就是配的话,有可能不成功,有可能拿不到那个ID,那也不行,所以这里边我们必须要求这个存在式ID必须是不为空啊,这是这个订单事件流,除了这一条流之外还有什么?
10:06
还有一个就是支付到账的那个流对不对,支付到账流事件流啊,那这个过程其实也是比较类似的啊,比方说我们叫一个receipt resource。好,然后get class get resource。把当前的这一个文件名称传进来,接下来定义一个叫receipt stream,对吧,还是env fair,这里边传的是receipt source,他的pass啊,那同样接下来的操作是不是还得做map呀,Data。
11:02
Data RA做一个切分啊,我们这里面都是CSV文件,所以都是逗号分割的。这里边把它要包装成一个样例类,Receipt event对吧,Data RA的零,呃,大家看一下它那个类型是什么来着哦,String string long,所以这里边就是直接tri一下就好了。瑞一二,这个要涂了,对,好,我们先做完这个基本的转换,然后接下来常规还是啊,啊,你该filter filter,那这里边可能我们不需要再做filter,因为拿到的都是那个跟订单相关的信息嘛,啊,当然后边是不是还应该把那个时间戳先分配出来啊,这个我们要看数据了,看一眼这里的数据啊,这个时间戳一眼看去的话,大概还是升序的啊,所以这个我们也先就按照升序去考虑了,大家如果觉得这个地方如果我要以乱序去考虑的话,大家也可以习惯就是说把它写成一个乱序的表达,只要指定时间戳,然后按照我们的这个预测啊,给一个最大延迟时间就可以了,对不对啊,所以这个东西大家养成处理乱序时间的习惯啊,这里边我们为了简单一点,把这。
12:30
这个作为升序数据处理还是因time乘以1000对吧?那接下来还要做KBY,是不是也是k by transaction ID啊好,所以现在这两条流他们相当于都根据transaction ID做了一个k buy,那是不是只要他们接下来匹配上的话,两条流一对对吧?这边如果有数据,这边也有数据能能对得上,那其实就肯定是。啊,他们是不是就是成功到账的一个一个信息我们就应该输出啊,或者说如果发现诶有一个有一个有数据,另外一个没有,那这就相当于是不匹配的情形,我们可能就要出输出一个报警信息了,所以接下来我们看一看两条流到底怎么去处理呢。
13:18
将两条流。连接起来共同处理,哎,所以这个连接的过程,那我们定义一个,比方说叫processed stream,怎么连接,怎么处理呢。用connect,所以我们是前边的order event stream.connect是不是可以直接把后面这个传进去,而且大家记得对于这个本身传进来的数据类型有要求吗?没要求,对我们说一国两制对吧?本身他们可以是不同的数据类型,而且可以有不同的处理方法。接下来这里边传进来的就是receipt event stream,然后再做一个,大家看是不是可以直接调map Fla和process。
14:12
所以这里我可以直接给一个process,里边要传的就是一个Co process方式。就是前面我们提到过process function有好几种类,对吧?一般我们用的是kind process function,那这里面再给大家介绍一个Co process function,好,这里边我们定义出来这个类啊,就叫transaction pay match。呃,当然了,下边大家不要忘记做这个执行啊,当前是transaction match。
我来说两句