00:00
我们看下一个模块,我们想要实现的这个需求是订单支付的实时监控,那这个模块里边主要就涉及到了在电商网站里边啊,大家想这个有两个环节,其实对于这个风险的控制啊是最为关心的,一个就是前面我们讲的用户的登录,因为它跟用户账户的安全息息相关。另外还有一个环节就是。就是订单的支付,因为它是跟我们的真正平台的营收收入是直接关联,直接挂钩的,所以这个环节如果要是说有一些异常,有一些状况的话,我们应该要实时的把它检测到,然后做一个报警啊,当然就是说订单本身是我们可以去做一个下单的检测,就像我们前面讲的刷单,如果说有刷单行为的话,直接可以检测,那现在呢,我们讲的是订单和后续的这个支付行为关联起来的一个检测,首先我们能想到的一个需求就是。超时失效,哎,就是我们所说的订单下了之后,我们难道就一直等着,等用户支付就行了吗?就挂在那儿,呃,这个他想什么时候支付什么时候支付吗?一般的网站都不会这么去设计,大家想想为什么不会,就是直接放在那,你想什么时候支付,什么时候支付就完了呢?那首先就是说,诶大家想就是你如果放在那儿的话,那用户就不着急支付了嘛,那家想就是甚至有可能出现这种情况,我如果对这个商品比较感兴趣的话,我不管怎么样,我先下单,先把它买了再说,然后呢,我就不支付,就等着呗,对吧,等到我真正想要的时候,我再去做一个支付,那这个就会失去意义了,这下单是不是就跟购物车一样了?哎,所以说从这个功能上来讲,是不应该这么设计的,你如果给一个超时时间的话,诶倒计时在那看着,过一段时间之后,这订单就失效,那么用户就会有一个急迫感,他就觉得诶这个我马上要支付掉,对吧,而且另外从业务逻辑上来讲,大家想。
01:57
如果说用户一直占着这个,下了订单之后,然后不支付的话,那他家想是不是相当于我们在库存里边,这个商品是要库存是要减一的呀。
02:09
诶不能其他用户就不能在下订单买这个商品了,对吧?诶所以说在这种场景下,如果说一直不失效,这其实是不正确的一个一个做法,就好比大家买票的时候,像幺上这个12306啊,你说如果说要没有超时时间的话,那大家那个抢票的时候,那肯定就是先占着之后我不支付嘛,那这这种情况下,别人是没有办法买这张票的啊,这就会造成很大的这个呃效率的低下啊,很多人也是这个,我们业务系统就会出现这个中断的情况,所以所有的平台基本上都是要有一个订单超时的一个处理的,那传统来讲的话,大家想这个功能应该怎么实现呢?传统的企业里边,那订单既然要超时,那一般情况就是下了订单之后,用户下了订单之后,这边我应该在数据库里边有一个记录,对吧?然后我可以比方说有一个轮询的额外的一个任务,不停的去查询当前的当前的这个订单的状态,对吧?然后我检测到如果有一些订单,它的那个下单时间跟现在的这个系统时间相比,已经超过了我设置的超时时间,比方说15分钟对吧,或者半个小时已经超过了,那我就直接把这个订单的状态制成一个失效,哎,这是常见的一个一个处理方式啊呃,那大家会发现这种处理方式呢,这是不是相当于业务系统的这个,它本身要做的事情就会比较多,比较麻烦呀。
03:36
啊,对吧,他他自己就得去实时的去做监控,就得查我当前的这个订单是不是失效了啊,那另外还有一种比较好的优化方法,就是大家知道在release里边是有对应的那个失效时间的,对吧,我可以设置一个K的失效时间,所以呢,我可以直接把对应的这个状态,或者说当前这个订单的这个KR对应的这个值直接保存到re里面去,然后呢,就相当于我那边它是自动失效的,我这边在查这个订单的时候,假如说接下来我用户再对这个订单有这个操作需求的时候,我查订单状态的时候呢,到release里边去判断一下,然后我发现如果那个re release里边已经没这个值了。
04:18
已经超时失效了,那是不是我现在就应该。就是把他当前的订单状态制成一个失效,然后去呃返回一个,就当前这个订单不能操作了,对吧,是一个失效的状态。那大家想一下,这两种方式分别有什么问题?首先第一种方式就是我们说的实时,确实是实时的去监控,对吧,不停的在刷新当前订单的状态,但是对于业务系系统来讲,对于对于业务数据库来讲,这个压力就非常大,对吧?我随时都要监控啊,当前这个订单是不是超时了啊,那如果说放在red里边自动失效,然后我用到的时候再去取,再去判断的话,这个确实是业务系统的压力小了,但是它的时效性好像就没那么好,对吧。
05:02
那就是说我必须得用到的时候,我再去找,再去判断,就不能是它一失效了之后,我立马就有体现,就相当于我这边那个呃,对应的那个数据库的字段就改了,对吧,就马上就变成了一个失效状态了,那如果说我们想要做到立刻就生效,相当于是不是我还得有一个另外的轮询程序,不停的查对应的那个数据库里边的,呃,那个里边的那个状态啊,哎,所以这个过程的话,就是在实际的应用过程当中,其实还是有一些问题的啊,那大家自然就想到了,那有没有解决方案呢?特别是像现在,如果说我们这个短时间内订单数据量又特别特别大,那大家想这个东西你全让业务数据库去做的话,压力太大了,对吧?啊,这个其实是一个额外的压力,而且说这个数据量是可能是很大的,所以我们自然就想到了,那能不能用大数据处理架构去承接这一部分功能呢?之前我们如果只用大数据做这个离线处理的话,那是做不到的,因为时效性不行嘛,那你这里边本来我们那边就是想实时知道当前这个是否失效的一个状态,你如果这里边又用了一个离线处理啊,直接是这个隔很长时间之后才判断出来这个状态的话,那我们那边订单用户那个订单马上就要做其他操作的时候,就已经判断不出来了,对吧,那可能有一些这个操作就异常了,所以我们这里边可以选择的一个东西就是啊,之前我们那个离线操作的话不行,但是实时操作可以啊,所以现在我们有了flink这样的实时处理的流式处理大数据处理框架,就可以解决承接业务系统的这部分功能了。
06:43
啊,接下来我们给大家来讲一下怎么样去做这个订单失效的实时监控啊,那所以接下来我们还是在代码里边先把这个模块先创建出来,新建一个module,然后接下来当前的rif ID,我直接就叫做order pay,就是订单支付的一个状态的检测,对吧,不管是失效还是后边我们要讲的那个双流匹配啊,Draw影对照,呃,都是跟这个订单支付相关的,所以是order pay detect把它先创建出来。
07:19
然后在当前的这个po文件里边,其实大家想到既然你要做这个检测,前面我们也发现了,是不是最容易做这个复杂事件处理检测的这种方式就是CP啊,哎,所以我们直接就把之前这个logging field里边这个CP的这个依赖直接导进来。放在当前的泡沫文件里。呃,然后接下来呢,我们就是在。当前这个模块下边啊,要去做这个处理了,Order pay对吧。还是我们的代码主要是放在source main Java下边,呃,Resources下边要放一些数据文件,那我们这里边的这个数据应该长什么样呢?
08:03
大家想我们现在要处理的数据,其实就应该是买点日志里边是不是应该有用户,用户下单和用户支付一个订单对应的那个数据啊。哎,所以接下来我们还是节选这个user behavior里边的数据,但是非常可惜,我们本身的那个user behavior里边只有一个败操作,那大家想这是不是就没有下单和支付两个操作分开啊?哎,所以在这种场景下,我们就只好单独再去找一些测试数据了啊,那我们现在测试数据放在了这个order log里边,我把它直接copy过来,放在当前的resources下边。那现在大家看一下当前的这个order log里边到底有哪些东西呢?这里边其实主要是也是CSV文件啊,已经作为ETL之后的,所以这里边其实就所有的数据都是用户对某个商品,呃,那不一定是某个商品,因为大家知道一个订单里面有可能有多个商品,对吧?呃,都是用户对呃某个订单做的一个操作,但是大家看到这里面好像只有一个类似于编号ID一样的东西,那这个ID应该是什么呢?
09:15
大家可能想那user ID嘛,但是大家注意这里边我们单个如果判断这个订单行为的话,最重要的是判断同一的同一个用户,还是要判断同一个订单呢。其实接下来我们是要看当前这个订单创建了,然后订单到底有没有被支付对不对,我们要关心的是不是就是订单的行为啊,对吧,所以这里面其实跟这个用户到底是哪个用户都没关系了,所以这里面我们ETL出来的这个字段其实是其实是订单的ID对吧,Order ID。啊,所以这个大家要稍微区分一下啊,因为做过ETL,所以说前面那个用户ID都已经直接被绿掉了啊呃,只剩下了一个订单ID,然后接下来下一个操作,这就是当前用户的那个行为类型了,你到底是下单创建了一个订单,还是还是支付了一个订单,对吧?啊,当然中间可能还可以有其他的操作,比方说修改了一个订单modify,这也是一个行为,对不对啊,用户那边做一个行为,我们日志里面都会有体现,然后下边呢,大家看这里边还有一个字段是空的,有些地方有这个字段,有些地方没有,那这个字段是啥呢?
10:26
我们看到create这里面就没有,然后如果是pay的话,是不是这里面都有啊,所以它其实是一个。就是我们支付完成之后的那个交易码,对不对啊,就是第三方支付平台给我们返回的那个交易码啊,确认当前支付发生的那个码啊,所以这就相当于是一个transaction ID啊,那这个现在我们可能没什么用,只要知道它支付就行了嘛,然后后边还有一个就是时间戳了,所以基于这样的一个数据结构,我们是不是还是在Java下边。先去把当前的这个呃,当前的这个po类先定义出来,对吧,带上包名啊,com.at硅谷点当前order pay。
11:12
啊,我当前这个就直接叫做病啊,当前这个就直接叫做order event订单事件。好,把这个先创建出来,我们现在需要的这个字段,首先是不是就是一个啊,这是长整型的啊,长整型的一个order ID,订单IDUID已经直接被过滤掉了,不要了啊,然后接下来下一个是string类型的一个行为类型,对吧?当前用户的那个到底做了一个什么操作啊,是下单还是支付,还是修改,呃,我把这个叫做呃,Even type吧,或者叫那个行为behavior type,对吧?Even type事件类型,然后接下来我们还有一个字段是string类型的一个交易码,对吧,Transaction ID,最后还有一个长整型的时间戳time step啊,这就是我们当前能够拿到的所有的字段,接下来把这个构造方法,空参的带参数的全写出来。
12:15
Get center也是自动生成,最后再去自动生成一个to string,这就是我们想要定义的这个当前的数据源对应的那个pole类型啊,啊,然后那大家想我们现在这个数据都来了之后,接下来是不是就是在这个flink里边读取数据去做检测啊啊大家想我们检测到其实就是什么呢?我要检测create,如果一个订单有这个下单的时间create了,接下来我是不是就是要等他的那个配支付时间到来啊。那大家想我接下来是不是就可以设置一个就就类似于我可以设置一个这个延迟时间对吧?啊,或者说我们可以定义一个定时器,类似于这样的一个东西啊,然后是不是就是等15分钟后边只要15分钟之内来了配事件当前是不是就匹配成功了啊,所以这个就是成功支付,那大家想如果要是就是什么样的情况是出现这个支付超时呢?那是不是到15分钟的时候还没等到对应的配,这就是支付超时啊,啊所以接下来我们其实用这个flink是很容易能够实现这样一个检测的。
13:25
那我们是不是在flink里边只要检测到对应的这样的一个超时的事件。我就可以直接把这个输出一个报警信息啊,啊,我们在这儿就直接给大家输出一个报警信息好了,这个报警信息呃,也是非常简单,最简单的一个方式就是直接来一个order ID,然后输出一个信息,说当前这个order失效了,对吧?哎,所以就是非常简单的一个信息啊呃,那不光是失效,超时失效,可以有对应的一个这个输出,就是正常匹配的话,我也可以有一个输出,对吧?所以这里边我干脆直接把这个叫做不要叫报警了,我直接叫做order result吧,不管是正常的结果,正常支付的结果,还是报警的结果,都叫做一个order result,那这里边其实就两个字段,一个是最关心的,应该就是哪个订单长整型的order ID,对吧?然后另外还有一个字段,就是是不是就是当前的那个result呀,对吧,Result,呃,State。
14:28
结果状态啊呃,那下面我们还是把对应的constructor空参和带参数的都创建出来。另外下边这个get set也要创建出来,还有就是对应的to string方法自动生成啊,这就是我们能够想到的需要的这个输入和输出的po类型,然后接下来我们就看一下对应的这个代码到底句怎么写对吧?那同样还是在Java下边啊,我们新建一个class,带上包名,com.at,硅谷点,呃,Order pay detect,然后当前我这个就叫做al,呃,我要检测的是它的那个超时对吧?所以我直接叫做order pay timeout。
15:25
然后接下来的这个主体流程,其实跟之前是差不多的,我们首先这个PSVM。上面这里边要throws一个exception,接下来当然是首先创建流式的执行环境,Get这个环境啊,Env定义出来,哎,那同样我们还是现在里边有时间戳,那不是肯定要设实线时间语义对吧?啊,这个是自然能够想到的,然后接下来呢,全局并行度先设成一好,那接下来首先第一步我们是不是要定义就是读取这个当前的数据对吧?读取数据并转换成po中类型啊,那这个过程的话大家都轻车熟路啊,我们从那个文件里面读取,利用当前的这个类的反射啊,点class,然后我get resource,这里面是不是直接传一个相对路径就可以了。
16:22
Copy这个pass啊,得到一个resource。呃,这个我可以把它叫,呃,直接就叫resource啊,那下边。接下来得到一个data stream,我们想要的那个类型,是不是就是包装好的all event po类型啊,啊,那当前这个我就叫做all event stream,对吧?Data stream也可以啊,这个无所谓。Env,然后直接text file,这里边要传的是resource.get pass,把这个路径传进去。然后接下来这只是一个string类型的,呃,读进来的一个数据流嘛,那接下来是不是做那个map操作,这个过程还是一样啊,每一行先切分成一个string类型的数组,这里边我还是叫做每一个字段叫做fields,里边line去split,现在是CSV文件逗号分割,所以逗号做一个切分。
17:24
接下来是不是就是return就可以了,Return一个,你有一个order event里边的字段,大家再回忆一下,首先是order ID,那是不是长整型啊,FS0。然后接下来第二个是呃,我们定义的那个事件类型对吧,是下单还是支付,那那个是一个字符串了F1,然后再下一个是一个交易码transaction ID也是一个字符串F2,最后长整形的时间戳new,一个long fields3。好,这就是我们定义的一个过程,后边是不是再做一个ign times Sam and rock啊,那大家要看一下当前的这个数据到底是什么样子的呢?呃,看这个到底是乱序还是升序啊,4243哦,大家看这个的话。
18:16
基本上升序对吧?啊,基本上都是按照这个时间顺序一个一个增大的,所以这里边我们直接就给一个升序的,Ending time stamp extra,这里要提取的是element点。Get time Sam,这里我们是不是还要再乘以1000啊,因为前面大家看到这是一个秒对吧?啊,所以前面这个过程大家都轻车熟路啊,所以这个就是做到这一步做完就可以,然后接下来我们要把这个整体的处理流程列出来了,首先是不是第一步要去我们用CP来实现CP简单一点嘛,首先是不是定义一个。哎,接下来是带。呃,这个时间。
19:03
限制的带时间限制的一个模式,对吧?我们要定义的是这个东西啊,所以当前我可以直接pattern,大家还记得这个调用过程吧,直接pattern.begin然后前边是不是指定当前的那个类型啊啊,当前那个数据的类型啊,我们当前都是order event已经包装成po类型啊,那当前begin的话,大家想我首先是不是应该是一个创建订单的过程啊,所以我直接就叫create吧,或者大家叫那个下单对吧?呃,叫什么都行啊,就叫create。那么接下来他去提取的时候,条件where应该给个什么,是不是应该给一个啊simple condition啊里边是不是就是直接提取当前的那个操作类型必须等于create就可以了呀,啊所以这里边我直接用create.equals当前value的是不是get even type呀,做一个判断,对吧?啊,当然这里边大家看到这个simple condition啊,这个它是不是也是继承自ative condition啊,啊对吧,就是它本身也是继承自这个可迭代类型的,然后这个可迭代类型里边特殊的就在于是不是有一个ctx,有一个上下文啊,所以我们说这个上下文里边能干的事情就稍微多一点啊啊这个大家如果用到的时候用到的话,再说我们现在用不到简单条件就够了,那有了这个begin接下来。
20:35
我把这个给大家分开啊,这是第一步,然后接下来是第二步,第二步那是后边要跟着一个支付的事件,对不对?呃,就是订单创建之后要支付,那家想我现在这个就有一个模式的选择了,到底是宽松禁灵还是严格禁零呢?哎,之前我们那个连续登录失败,中间不能插入别的,你插入成功那就不对了,对吧?那现在我们中间能插别的数据吗?可以啊,你下订单之后是不是可以猫地犯,可以修改啊,对吧?可以做一些别的这个订单的处理,然后再去支付对吧?诶,所以接下来我们是可以中间插东西的,所以是宽松性令follow by,那这里接下来是一个支付事件,我就叫做pay,这是一个名称,Where里边还是给一个。
21:24
给一个你有一个simple condition,那大家知道这里面应该是什么呀?是不是应该是要求它的类型必须是pay啊,Pay equals,当前value.get even type对吧?所以这就是先发生create后发生pay嘛,非常简单,我们的要求是怎么样,是不是要在within,是不是在15分钟之内啊,比方说我们要求啊,订单超时时间15分钟,那所以直接给一个time.MINUTES15对吧,这就是Python定义。前面我们把这个对应的变量也声明出来,当前这个就叫做order pay pattern,这里大家注意我定义的是order pay pattern,因为我这里面检测的是啥呀。
22:15
是不是检测的是成功支付的这样的一个15分钟之内成功支付的数据啊,诶,那来想我们重点其实想要处理的并不是成功支付的数据,对不对?哎,那其实我们想要处理的是什么呀。对,其实是,其实也不是,就是呃,不没有成功匹配的那家想如果完全没有成功匹配的,我们也就不管了,对吧?我们其实想要检测的是什么呢?是create已经检测到了,但是对我在一直等配,但是呢,也不是说呃,这个配就就就绝对就没有匹配上是还在等,因为follow by我是可以后面一直等的,对吧,还在等的过程当中,结果到时间了,超时了,那这就有点像料到了开头没料到结尾的这种东西,对吧?匹配一半,然后超时了,我们需要检测这个,所以是不是后边我需要有一个超时的处理过程啊,那要超时处理过程是不是相当于得定义一个。
23:14
是不是得定义一个测出流的标签啊,对吧?定义测输出流标签,然后用来表示超时事件。哎,那所以这里面我们直接out for the tag。直接把这个定义出来,它对应的类型,那当然是我们输出的结果应该还是order result,对吧?不管是主流这个正常匹配的还是超时的,都是一个order result,我直接把它叫做order timeout tag,那么这里面我们要new一个out tag,呃,里边给一个名称,当前是all the timeout,然后后边我们加上这个画框,把它声明出来,对吧?这是我们定义的过程,好,然后这些该有的东西都有了,接下来我们是不是就是还是老规矩啊。
24:13
要在之前输入的数据流基础上,是不是要应用那个pattern得到一个pattern stream呀,对吧?在呃,将patternon应用到输入数据流上,得到pattern stream这一步其实大家知道就已经是相当于把我们呃想要得到的满足条件的那个事件就相当于已经匹配到了,对吧?已经找到了,但是还没单独的提取出来做转换,后边select才是做转换啊,那这里边我们就是来一个对。Cep ce.pattern对吧?调这个方法,然后前面我们要传的就是order stream,然后这里大家要注意,我们接下来检测这个create pay这个事件没指定,并没有指定是哪个订单,对吧。
25:09
那我们现在是不是要是所有的订单混在一起,Create和配来了之后都算吗?当然不是,是不是针对每一个订单啊,那所以前面这个流是不是也要做一个分组啊,哎,所以还是先做一个K,我们当前就是以order event的,是不是要以这个order ID作为一个键去做一个分组啊,然后接下来这样分组之后的流应用当前定义的order pay pattern对吧,得到这样一个pattern string,好,我把这个先定义出来,就叫做pattern string,好下一步第四步,那大家想接下来是不是就是要得到一个。最终我们检测到的那个流了对吧?啊,所以我们是调用。Select方法,然后实现,呃,对。
26:03
匹配复杂事件和超时复杂事件的提取和处理,诶这就是这样的一个最终处理的一个流程对吧?啊,那所以这里边我们可以得到之前的这个pattern stream,直接调select或者呃,Fla select方法对吧?这个都一样啊,我们这里边直接调select里边要传的参数,现在是不是就传三个了,首先把那个测出流标签传进去,Order timeout tag,然后注意首先要给的是一个对T,注意啊,大家看如果传三个参参数的话,先传的是一个pattern。Timeout方式,然后最后才是pattern select方式对吧?啊,所以这个顺序是这样的啊,首先是处理超时事件啊,那所以接下来你有一个我自己定义了啊,这个就叫做order time out select提取对吧,然后最后再来你一个order pay成功支付的一个提取select,这就是我们想要做这个操作,诶,当然了,最后得到的结果我们就把它叫做一个result strip。
27:22
这就是完整的这个处理流程,对吧?可以把这个select放在下边,大家看的清楚一点,那最后得到的结果大家知道result stream,如果直接打印的话,这个得到的应该是什么呀?这个是不是应该是正常支付的呀,对吧,Paid。Normally,对吧?这是正常支付的这样的一条流,那如果要是超时的,真正想要报警的那些数据应该在哪里呢?是不是应该在测输输流啊,哎,所以我就直接基于,诶,正好我们这个就是single output streamam ofer对吧,调它的get side output获取测殊入流的方法,这这里边是不是要传那个tag进来,诶还是我们之前定义的那个order timeout tag,然后接下来诶这个print啊,这个才是真正的timeout对应的那个订单啊,那这里边可以得到最后的结果,我们还要不要忘记XCQ的执行起来。
28:22
这个叫all the timeout detect job,对吧?这就是我们完整的一个处理流程。
我来说两句