00:00
接下来我们继续给大家讲下一个拈啊,这里边有接下来有新的需求出现了,这个拈呢,主要就是跟我们业务系统里边另外一个非常重要的环节有关,那就是下单和支付的环节啊,大家知道在电商平台里边,其实这个环节是非常重要的,因为最终创造利润啊,创收的这个环节,真金白银的环节就是这儿对吧?啊,就是要等这个用户下单,然后支付这个环节才能创造真正的收益,那所以我们对于这个用户下单和支付的行为,其实是应该有更多的安全保证和风险控制的,所以这里边给大家提出一些风控指标,主要就是针对订单支付这个环节的,那首先我们提一个什么呢?啊,大家会想到就是说,呃,首先就是用户的这个下单下单行为啊,大家可以认为它也是用户对于商品的一个一个需求,或者说。
01:00
一个兴趣对吧,首先是可以作为这样的一个考量,然后另外还有一个方面就是说,哎,我们这里边下了单之后呢,用户下了单之后呢,他也并不是每一次都会直接支付对吧,甚至有一些就是用户他可能拖很久他都不支付,所以这里边一般情况我们在处理的过程当中,业务系统里边会给用户的订单设置一个超时时间啊,这个大家都非趁理解,我们一般用这个电商网站或者相关的一些购物的时候啊,支付都会有一个超时时间,对吧?哎,那这个它的含义是什么?一方面它要给你一个紧迫感,就是你如果看到一个倒计时不停的在在减的话,你肯定就想,哎呀,要要,要不我赶紧支付了吧,对吧?呃,到时候这个到时了之后,我还得重新下单,很麻烦,如果说没有这个紧迫感的话,那大家肯定的想法就是,那就算了,对吧,我先放在那儿,然后想起来的时候想想付再再付呗,那这个就下订单就没有没有意义了,就跟我们相当于一个购。
02:00
库车收藏夹一样了啊,这这个当然是需要尽量需要去避免的啊,需要提高用户的支付意愿,那另外还有一个,其实从业务逻辑来讲的话,设置超时时间也是必须的一个环节,比如说大家想一下,我们在买票的时候,这个12306对吧,这个就非常的明显,你说你如果下了订单之后,你直接占着这个票,那那你想是不是相当于我们库存里边这个票为减一啊对吧,你已经下了订单了,你没有支付,那这个这个别人不能直接下单去买这张票啊,万一你又支付了呢,你不能一张票卖卖两个人嘛,所以这种情况下我们库存是要减一的,如果说你一直不释放,一直站着的话,那相当于本来这个东西没卖出去,结果最后还导致别人都买不了了,对吧,真正想买的人买不了了,所以一定要设置一个超时时间啊,那传统的业务数据库里边大家也知道啊,都是用业务系统去搞定这件事儿的啊,就一般情况我们在呃,数据本身的这个业务系统里边,可能你。
03:00
啊,不停的去判断一下当前这个订单,诶下了多长时间了呢?当前这个订单这个是不是要更改一下状态,变成一个超时状态呢?啊可能要有一个服务专门去做这件事儿,那另外还有一个稍微简单一些,或者说容易实现的方法,就是把它直接扔到red里面去啊这个大家也比较熟悉,就是red有这个K自动失效时间嘛,如果说我检测到诶那边已经失效了,那我这边直接把它的状态去做一个更改,变成一个超时状态,这是现在大家可能在生产场景里边常见的一些一些应用的这种行为啊,那这里边我们为什么要给他,给大家再用这个flink做一个实现呢?主要想到的就是诶,就一方面,假如说这个数据量特别大,你怎么办?如果数据量特别大的话,大家知道对于业务系统而言,是不是这个本身来讲,就是这这是额外的一个负担,对吧,我要不停的去看当前这个订单的状态到底是什么样的。
04:00
啊,对吧,去不停的,相当于我另外得有一个进程去不停的检测轮询当前的那个订单状态,然后呢,如果发生了,呃,这个更改啊,就是更新的这个状态的话,我再去数据库里边去做一个调整,所以说这种操作其实对业务数据库是一个很大的负担,如果说数据量又特别大的话,那其实可能对我们整个业务系统的性能会造成很大的影响啊,那另外一个角度,我们考虑它是不是跟当前业务系统的核心业务流程其实并没有特别大的关联,对吧?哎,你其实这里边用户正常下单嘛,该下单下单,该支付支付,其实这个超时,这是另外的一个服务需要去做的事情,跟我们常规的业务系统想做的处理的逻辑它是两回事儿,那为什么我们不把它提取出来呢?把它剥离出来呢?哎,所以这里边就提供了一个思路,我们可以用flink实时的流处理系统去做这件事儿,哎,那之前为什么大数据处理系统不能做这件事呢?因。
05:00
因为它对于这个时效性要求还很高,对吧,你想这个,呃,本身这个用户的订单啊,比方说我们定义十分钟或者15分钟之后,这个订单就超时了,那他超时之后,我这里是不是应该立刻有一个响应,要更改用户那边的状态,订单的状态,对吧?呃,你不能说是这个已经那边已经超时很久了,我这边跑一个批处理,过了一个小时之后,诶,呃,这个更新状态说你那个订单超时了,那在这个过程当中,他那个订单还是被占据着,别人不能下单的呀,你如果出现就是像我们那个,呃,就是像这个,呃,双11或者说618这种购物节的这种场景,订单量非常大的那种场景,你这个处理就根本处理不,根本处理不过来了,对吧?呃,你你这个批处理这个延迟会造成很大的延迟,用户体验会非常的差,所以这里边我们就需要用延迟更低的这种流式处理系统来做这种做当前的这个业务管理了,对吧?啊所以接下来给大家讲一讲。
06:00
订单支付失效的这样的一个实现,好,那接下来我们还是在代码里边新建一个模块。当前的副项目下边啊,New一个module,然后当前我们这个,呃,这个模块其实主要是跟订单和支付相关,对吧?呃,我们首先是考虑这个订单的失效,所以我们整个这个模块叫做order pay,呃,Detect吧,检测啊把这个先定义好,然后接下来在当前的这个模块里面,我们需要引入什么依赖呢?哎,这个大家想到可能接下来,因为接下来我们要检测的是什么,又是有不同的行为,对吧?比方说你像这个超时事件,大家大家想这个如果要是说超时的话,我们应该检测用户什么行为呢?怎么样去处理这件事呢?那是不是应该用户的这个订单行为里边,我们能够提取他的这个行为日志对吧?啊,我们买点日志应该能够看到这样的行为,就是诶用户下了一个订单,然后用户支付了一个订单,对吧?哎,或者用户修改了一个订单,对于订单的操作,他应该都是一个用户行为,都是日志里边的一条数据,如果我们接收到这个数据的话,接下来就可以判断了,那什么样的情况下叫做超时了呢?我们应该去做一个报警提示呢?
07:23
那这种场景是不是就应该是,哎,对,就是我要定义一个,比方说15分钟对吧,那就是,那那我是直接定义15分钟的窗口吗?滚动窗口吗?每隔15分钟判断一次吗?不是的,这个还是之前我们说的要跟用户的行为相关,就是用户来了一个create订单啊,就创建订单下单的这个行为之后,是不是才开始15分钟计时啊,然后等到15分钟之后,如果发现他还没有支付这个订单的话,检测到了,诶那我们报警对吧?诶所以大家发现这个是不是我们用process function可以实现,你注册一个定时器嘛,来了前面一个数据之后,诶来了一个这个下单事件,然后注册一个定时器,然后等那个支付对吧,一直等到他来,如果他没来的话,哎,那那就相当于报警,哎,那另外大家会想到是不是我们可以用CP去做一个实现,这不是典型的前面来了一个下单,后面来了一个,呃,这个支付对吧?呃,如果说我要求。
08:24
分钟之内,如果说15分钟内内之内出现这个匹配的话,这相当于是匹配成功对吧?就相当于是成功支付的订单,如果说要是15分钟之内没成功,而且大家想这是一个什么超时事件对吧?那就是你有前面的那个create,就是我们说的有开头没结尾对吧?料到了开头没料到结尾啊,你你不能说这个用户连连下单都没下单,那那就那就叫做不匹配对吧,没有匹配上,那我现在是什么呢?开头已经匹配上了,有下单,呃,后面也不是说不匹配,就是说只是我还在等没等到对吧?呃,那那这里边我时间截止了,那这个时候我其实是一个超时事件,用cep的超时事件可以把它提取出来,对吧?哎,所以我们用cep可以对它做一个实现,所以这里边我们在po文件里边,先把这个cep相关的依赖先引入,对吧?呃,因为之前我们在做这个,呃,CP依赖引入的时候是在这个。
09:21
之前我们讲那个logging啊,登录的这个模块里边单独子模块把它做了一个引引用,所以我现在还得单独引一下,除非你前面是直接把它放在那个副项目的po文件下啊,我们现在就单独指定吧,现在是这个order pay啊,把之前的这个CP的依赖还是先引入进来,然后接下来当前当然就是原文件还是要放在s main下边,我们改个名,管这个叫做skyla,然后诶,当前我们首先是有这个需要的数据,那当前需要什么数据呢?大家回忆一下user behavior里边好像没有下单,呃,当然是有BY对吧?有BY那个行为,但是没有下单行为对吧?就这个没分开,所以现在我们还是得用自己定义的一些行为数据去做操作啊,所以接下来我们还是在数据数据这个data里边啊,呃,定义了一个,大家看到有这个order log,还有一个receipt log啊,当然这个reip log是后续我们可能需要的啊,这里边我就直接先给大家把这两个。
10:22
这个文件都copy过来吧,这都跟我们当前这个订单支付相关,对吧,我们现在主要要的是就是这样一个orderlo,大家看一下这个orderlo主要是什么呢?这里面我们也做过1TL里边主要需要的这个数据号分割,哎,那前面大家想到有一个ID,那这个ID是什么呢?还有呃,同学说这个U的ID嘛,对吧?哎,注意看啊,后边就没有别的ID了。哎,所以说如果说我们ETL之后已经没有别的ID的话,那这个应该那如那那就应该是什么?对,就应该只有一个订单ID对吧?因为我们这里边关心的是什么呢?就是每个订单的一些这个下单和一些支付的操作,对吧,就是我们现在没涉及到这个用户的一些行为,你比方说要从里边要想要去提取,比方说这个用户他是不是有刷单行为啊,呃,或者说这个用户他是不是他下单的这个习惯性是隔多长时间去支付,对吧,你要提取这些用户的一些行为特征的话,做用户画像的话,那可能这个时候是你需要有这个对应用户的信息的,我们这里边不涉及,对吧,你只判断这个订单到底是超时了还是没有,那是不是只需要订单ID就OK了呀啊,所以这个提取的时候已经不考虑user ID了啊,这里就是订单ID,然后另外还有一个字段,一个字符串create或者是配,那就表明当前最事件的一个呃,一个类型,对吧,到底是创建下单还是呃就是。
11:50
做了一个支付,那另外还大家看到这里create create空了一个字段,然后这里边这个支付这里呢,多了一个奇奇怪怪的一个字段,字段这个是什么东西呢?只有这个支付才会有的一个对应的这样一个字段啊,大家可能可以想到就是相当于我们支付的时候是有一个交易行为的,对吧?那所以是不是有一个交易流水号啊对吧?啊,相当于是有这样的一个交易ID,所以这里边是一个transaction ID啊啊这里就直接放在这儿了,那当然就是对于这个下单行为就没有对应的定义了啊这里边就是给了一个空字段,因为方便后面我们包装它的这个数据的样类类型,对吧?啊提取的时候给了一个空字段,然后最后还有一个就是大家非常熟悉的时间戳了啊,所以接下来我们在代码里边新建啊,已经知道这个数据长什么样了,还是新建一个object,我们带上包名com.at硅谷点当前是order。
12:50
Per pay detect,呃,当前我们是这个订单超时检测,所以是order timeout,先把这个单列对象先创建出来,没方法放在这儿,接下来我们首先要定义输入输出的样例类类型对吧?哎,所以接下来我们定义这个啊,定义输入输出样例类类型case class。首先这个输入的数据大家想到这就是一个订单事件嘛,不管是下单还是支付对吧,都是一个订单事件,所以我们直接叫它order event,前面我们也说了,首先是一个order ID对吧,一个长整型的order ID,然后后边是一个啊,一个type,一个类型啊,Event type string类型,然后再后边呢,是一个transaction ID对吧,一个那个交易流水号啊,这个是个string啊,因为那个字符比较奇怪啊,乱七八糟的一一个字符串。
13:51
然后最后还有一个是这个time stamp,一个长程形,直接把它定义好就可以了,然后后面我们做那个提取啊,呃,然后另外我们还得定义一个输出的样例类啊,大家可能想到这个具体来讲,我们这里边的这个具体应用场景应该是什么呢?其实如果要想最终的应用场景的话,如果要是分担这个业务系统的这个当当前的这个压力,我们应该是什么?直接就更改那个业务数据库里边那个状态,对吧?诶大家想如果要是这个订单超时了之后,我是不是直接连接到那个业务系统里边,找到对应的那个订单,直接改一下状态,改成对应的那个tout对吧?直接把它变成一个超时状态就完了,反正这样的话,我们业务系统就不用做任何操作了,那这里边因为我们没有业务业务数据库嘛,对吧,你这个还涉及到你得创建一个业务数据库,比方说有这个订单表对吧,然后里边有一个字段我们去改,大家可以想到这个操作,这里我们就不给大家做具体的操作了,只做一个什么呢?做一个报警。
14:51
信息的提示啊,就是相当于我只是告诉你对吧,输出一张信息告诉你,诶这个订单超时了啊,你去做一个操作吧,啊,或者说你去发一个通知也是可以的啊,这个大家下来之后就做其他的调整,做后续的处理就完完事了,我们这里边大家主要是了解flink的处理流程啊,那这里边我还可以多把把它这个定义的更加的广泛一点,就是我不要只报警,就是我输出一个什么信息呢?就是当前的这个订单到底是支付成功了还是超时报警了,我都输出一个结果,那就相当于是一个订单检测的结果,对吧,相当于我们这里边是跑的是一个订单状态的检测结果啊,所以定义一个这个就叫做order result,那当然order result嘛,需要有一个order ID还是一个长整型的order ID,另外就是需要有一个到底是什么状态了,对吧,我定义一个result message string类型好定义出来就可以了。接下。
15:51
看来里边的处理流程大家会发现完全类似,跟之前差不多对吧?还是我们做这个流处理,调用这个data stream API,引入当前的影视转换,然后接下来环境里边呢,我们设置一些基本的还是全局线射程并行度是一,最后结果正确啊,然后后边我们给一个这个还涉及到这个时间特性,时间语义,我们现在还是给一个事件时间,因为大家发现里边有时间戳嘛,诶那你接下来要去判断当前这个,呃,就是是否超时的时候,也得按照它,就是真正下单的那个时间,对吧?按照那个时间来来去定义它是否超时,而不要按照我们这个系统处理的时间,所以还是事件时间语义,那接下来就是读取数据了,对吧?呃,这里边,呃,这个我们不要叫这个第一步,我们叫第零步吧,因为后边我们CP里边大家定义的第步一般是认为是先创建那个模式,对吧。
16:51
定义模式,所以接下来我们首先是,呃,这个从文件中读取数据啊,这个我就大家都已经熟悉,我们就简单的写一下吧,Resource,先念一下get class.get resource resource resource里边给一个当前的相对路径,就可以order log传进来。
17:17
然后接下来,哎,我直接定义一个,我不用那个呃,Input stream了啊,我直接放在一起了,我这个叫做order event stream,就包装好的这个order event对吧?哎,这里边我直接从环境里边read t,呃,Text text file读取文件,我要的是当前resource的get pass对吧,把这个传进来,然后接下来是不是直接map就可以了啊对吧?读进来之后直接map,那这里边每一个data都是先做一个切分用,因为是CSV文件,所以我们用逗号直接分开包装成样例类,输入样例类order event前面是order ID a瑞A瑞零,这里的接下来是要转成一个长整型图了,然后RA1接下来是我们的那个,呃,其其实一就是我们的那个类型,对吧,到底是create还是这个就不需要了啊。
18:17
啊,然后ARA2是那个transaction ID,它本身也是string,也不需要,哎,最后是还有一个对,这是一个长整型图了,转换过来,然后接下来那当然就是分配时间戳和water mark对吧,到底是升序还是乱序,哎,那我们接下来又得看当前的这个事这个时间了啊那时间我们看一下8428438,呃,这个这个看起来都按照顺序排列好了,对吧?所以这里边已经是一个升序时间啊,那这这里边就没什么复杂的了,非常简单,直接用声序的定义啊,Sending time stamps,然后接下来呢,用当前的这个time stamp,对吧?呃,传一个这个时间戳的提取器,注意当前还是秒对吧,大家看一下啊,当前这个还是一个秒,所以我们要再乘以1000把它写进来,这这就是前面要做的这个操作,然后后面大家注意,就是我们如果后续要做这个CP啊,做这个。
19:17
定义patternthon之后,应用到当前的这个get stream上的话,还要做分组,对吧?哎,那我可以不那么麻烦,我在这里边直接把它定义成分组之后的那个hit stream不用完了吗?那这个也是没问题的,对吧?我直接做一个K呗,按照什么做KBY有两个ID,哎,用order ID做一个K,当前同一个订单才去判断他的下单和支付行为,对吧?你不是同一个订单,那你就不要混在一起去做操作嘛,哎,所以这就是我们前面要做的这些操作,这框架先放在这儿。
我来说两句