00:00
讲完了在网站里边的恶意登录的监控,那恶意登录这一部分呢,其实并不是,呃,电商网站里边的一个特色,大家可能知道,在所有的网站,几乎所有的网站,只要涉及到用户,涉及到账户安全的,是不是都得做这方面的保安全保证啊,对吧?这这些方面的应用其实是比较广泛的,另外还有一个比较广泛的环节是什么呢?诶,那就是跟支付跟钱有关关系的环节,对不对,在电商里边特别明显。大家知道在电商平台里边,最终创造利润,最后最终有这个真金白银入账的,是不是就是最后的这个支付环节啊,所以在这个环节里边,我们对它的安全性,对它在业务逻辑上的设计考量可能就会更多一点。那比方说。啊,在前面给大家做这个项目介绍的时候,也提到了一个很容易想到的点,就是什么呢?我们得给这个订单设置一个失效时间啊,它的主要思路其实就是说,一方面我们在业务逻辑里边,你不能让一个用户他永远占着这个库存啊,因为如果一个用户下了订单,理论上我们库存就应该减一了,你不能说别人下了订单了,其他人还可以买这个东西,呃,这这没有足够的库存,你卖不出去的嘛,所以在这种情况下一直占着库存显然是不合理的方式,所以我们应该设置一个失效时间。
01:28
从另外一个角度讲呢,设置失效时间也可以有更多这个安全方面的保证,对吧?啊,这个防止这个啊,大量时间这个刷单,然后又又不去购买对吧?你这个如果从这个订单的角度去看的话,诶,好像这个这个这个商品短时间内好像就已经卖出很多对吧,很多已经已经这个库存已经减少了,但是这个其实是一个不正常的现象,另外还有就是说有一个时间限制,用户就会有紧迫感啊,就像我们现在在准备一些,呃,你准备一些考试,或者说有一个所谓的deadline的时候,那这种紧迫感就会比较强烈,对不对啊,就会催催促着你赶快去做支付,所以这个对于这个所谓的成单率啊,订单转化率其实是非常有帮助的一个逻辑,所以在网站里边,我们往往要做一个做一个订单失效时间的控制,呃,那大家可以想一下我们。
02:28
如果不用flink,不用这个现在给大家讲的这些大数据处理的框架去处理的话,传统应用我们可以怎么做这件事情,怎么做这个时效呢?啊对大家其实想到很简单的一种方方式,是不是可以把这个订单相关的那个数据直接放到redis里面去啊,对吧?放到redis里边的话,那本身那个key是不是可以设置失效时间啊,但是大家这个还要注意,就是说我相当于是不是还应该得实时的去刷新一下那个订单的状态啊,啊,所以这你可能得有另外一个应用去去那个检测当前订单的状态,如果要是那边已经失效,已经超时的话,那应该要把这个订单状态改变,然后可能要给用户有通知,对吧,要有一些前端页面上的改变,这是后续的一些操作,呃,我们之前大家比较熟悉的,可能也就是这样放到redis里面的一种操作,如果不放red的话,那怎么办?那可能就得你另外起一个任务,起一个服务。
03:29
去检测对不对,当前的这个订单,诶它它是否到失效时间了,当前这个时间有哪些订单应该失效了,把它的状态改变,这是大家能够想到的传统的数据库的操作方式,那这种方式有什么问题呢?这种方式其实很简单的一个问题,就是数据量特别特别大的时候,那我们可能想要去处理的这个东西,这个压力就会比较大了,对于数据库而言,本身这个操作耗费的资源就会比较多了,对吧?啊,所以在应用比较极端场景下的时候,我们可以考虑把这种检测,把订单失效的这部分机制可以放到大数据的处理框架里面来,那那这种处理是不是对实时性要求也比较高?
04:16
大家想哎,你不能说是我本来定定好的是15分钟失效对吧,或者十分钟失效,然后呢,你到15分钟的时候没有去通知你是一个小时跑一次定时任务,然后你隔了一个小时才发现它失效了,那你说你这个失效时间到底是15分钟还是一个小时啊,呃,这个对这个东西就相当于没有意义了,对吧?所以在这种情况下,我们是要用更加实时的方式去进行一个订单失效的检测,那flink是不是就是一个很好的选择啊,所以接下来我们就用这个flink实现一下实时的检测订单失效状态这样一个功能。具体来讲的话,呃,我们这个需求就定义成什么呢?就定义成在15分钟之内,比方说我们来了一组事件,来了一组,这个就是订单相关的事件,那大家会想到订单相关的事件,那应该有,呃,有这个创建一个订单下单对吧?呃,Create一个订单,还有支付一个订单啊,当然还有可能取消一个订单,我们现在关心的应该就是。
05:20
对,就是创建之后他有没有支付对不对,而且他们之间的时间应该在。对,我们定义一个时间,比方说15分钟之内create之后,如果15分钟之内没有没有配,没有支付时间的话,是不是相当于就超时了,我们现在要检测的就是这样的一个状态,那大家想一想,我能用什么方式来实现呢?呃,这里边首先大家想到这是不是又是跟时间序列相关啊,对吧,是不是前面应该是一个create事件,后边是一个配的事件对吧?前后的顺序相关,那是不是我们可以用CP把它做成一个事件序列,做成一个模式序列来定义出来。
06:06
然后我们去检测是不是符合,这里大家要注意,在这个例子里边,我们要去检测的是符合某种某种序列的事件,还是说不符合某种序列的事件。我们要检测的,如果要是create之后,15分钟之内有配,这个当然我们也可以检测啊,这个检测出来其实是。这个应该是正常支付的那种状态,对吧,这是正常的那些订单,就往往我们在这个过程当中,其实更关心的是对是超时异常的那种订单,对不对,那超时的怎么去处理呢。是匹配失败的吗?其实也不是,他其实应该是匹配了开头没有匹配到结尾对吧,就是我们料到了开头没有料到结尾这种情况,哎,所以在这种过程当中就要用到我们前面讲到CP里边还有一个特性是是不是可以提取超时事件啊。
07:09
所以现在我们要的是不是其实是那个超时事件,定义一个15分钟的话,最后可能要的是超时检测的那个序列,好具体来讲我们就在代码里边做一个实现。首先还是新建一个模块。这个模块我们就叫做order pay detect检测。呃,然后在这个po文件里边,我们应该加什么呢?大家想到如果要用CP的话,对,那是不是要把CP相关的这个dependency添加进来啊。好,我把这个先引入。然后接下来在这个里边语言文件肯定还是在main下边把它改个名。
08:01
Scale,呃,然后resources下边我们得放数据是吧,那数据是在哪里呢?哎,这个我们也有,这里边大家会看到有一个order log,这个数据就是我们在后台做做了买点日志收集到的,跟跟这个订单跟order相关的一些日志啊数据我们把这个先copy过来。呃,这里面大家会看到啊,这个这个数据定义的比较少一些测试的数据对吧,那大家看一下这里面的形式是什么样的呢?形式其实比较简单,前面这应该是一个。啊,有同学讲这是一个user ID吗?谁下的单对不对?这里大家注意啊,我们更关心的是是user ID还是是不是应该是订单的编号啊,对,这里边其实如果大家从日志里边提取,肯定是会有user ID的,但这里边直接就把这个过滤掉了,因为user ID其实这里边我们并不关心,对吧?我们更关心的就是订单本身的状态,所以这里边是一个order ID,就是一个订单的ID,然后后边是什么呢?
09:12
对,后边就是这个,呃,这个其实我们收集起来的应该还是用户的一个行为,对不对,对吧,用户对应的行为,那这是一个create或者是一个配对吧,常见的我们关心的就是主要就是这两个行为,然后后面这里还有一个字段,这个字段大家看create后面是空的,然后配这里呢,是一个比较奇怪的一个字符串,对不对?哎,这是一个,就是我们真正做支付了之后,是不是可以拿到一个交易ID啊,因为我们往往是要跟第三方支付平台去做交互,那这个时候你怎么样,之后我们怎么样确认他这笔交易是否发生了呢?是不是应该能够拿到一个交易ID这样的一个东西啊,类似于回执这样的一个东西啊,所以这个东西如果是create的时候,当然没有这个东西了,如果是配的话,是不是可以拿到啊,这个东西是可以拿到的,最后大家比较熟悉,这是一个时间戳,对,所以这个数据结构的话。
10:13
整体来讲就是这些,然后接下来我们就可以在skyla下边去创建一个object,这个我们还是带上包,名叫艾特硅古典order pay detect,呃,我们这个是检测订单的超时,我们就叫order timeout吧。呃,然后按照惯例,我们一开始先定一些样例类,呃,大家想我们现在要的样例类是不是就应该得是对应那个Outlook里边的那个订单事件啊,对吧,可以是create事件,可以是配事件,所以首先我们先定义输入订单事件的样例类case plus。
11:04
我们定义一个all。第一个字段,哎,我们说那个不是user ID,而是一个order ID,第二个字段是一个大家还记得这个第二个字段是一个对他的那个状态,或者是他的那个,如果我们认为这也是从用户的那个behavior那个日志里边提取出来的话,那相当于应该是一个行为的类型,对不对?或者叫事件类型吧,我们统一叫even type,这个也是一个string。后边还有可能有一个对交易ID啊,当然这个是一个string类型的。最后还有一个时间戳啊,我们叫time STEM或者叫even time都可以啊,这是一个long类型。除了定义好的这个输入事订单事件的样例类的话,我们往往还要想最后输出什么结果,对不对啊,那所以我们在定义一个输出结果的样例类。
12:08
那如果说我们要把这个所有的输出都包装在一起的话。那可能就是order result,那就是正常的话,我们可能输出一个正常的信息,如果要是出现异常的话,我是不是应该出现一个timeout的一个信息啊,对吧?这个其实也比较简单,我就直接把它包在一起吧,Order ID,这个比较关心哪个订单出现的状况,另外还应该有一个result的信息,Messger,这是一个string,我先把它包装好。然后接下来程序代码里边内函数,呃,这个就还是常规的一些操作,对吧,先创建。环境执行环境把这个引入,然后接下来配置一些,比方说先配置这个time character对吧,时间语义我们要用什么对。
13:05
在这种情况下,显然它带着时间托,我们要用的是even t,当然可以设一个并行度,设成一对吧,全局并行度射成一,方便我们看这个输出结果,然后接下来是不是就要读取数据啊?读取订单数据。呃,所以按照我们那种方式,我还是先定义一个resource get plus,呃,可以把这个get resource拿出来。复制一下它的这个名字好,拿出来之后就可以直接定义我们的流了,对吧?拿到我们的事件流了,All the event stream env,是不是text file对吧?resource.get pass对吧?把它拿出来,然后接下来是不是还得转换成我们想要的那个样例类啊,这个就就当复习啊,大家快速过知道怎么做就可以了,定义一个data ray。
14:18
这个数据是什么啊,还是逗号CSV文件对吧,逗号分割所以。Blue。还是逗号分割,所以我们可以直接把它先切分开,最后包装成一个定义好的样例类,那那这里边首先是一个order ID,那当然是它的一零。Trim之后再涂浪对吧。诶呃,这个就是一个,诶大家注意一下这里是string还是浪啊,这个应该是一个,应该是一个浪结构对吧,所以这里边我们还是把它定义成浪吧。
15:04
这个大家看具体的情况啊,当然用string也也可以,没什么问题啊,这里边既然前面这里看到都是树的话,我们就区分开都用浪好了,这里要出浪对吧,然后data ra1。这个所谓的even type到底是create还是pay,它就是一个string,那直接tri一下就好,二这是那个transition ID,这个是一个string对吧?啊,这个不用转换it RA 3tri值诶。To long,好,我们把它转换完成之后,下一步还要做什么?是不是还要去分配,因为是事件时间嘛,还要去分配time stamp和water mark,这里边我们用哪种方法呢?啊,那我们还是针对这个数据来了,对吧?看看这个数据是什么样子吧,哦,这个看起来比较好,这这个比较简单,我们可能之前已经做过处理,对吧?这里边的数据是升序的一个数据,所以这里线哎,有乱序吗?好像没看到是吧?下面有是吗?
16:25
892893,这个是升序的嘛,对吧?好,所以这里边我们就可以直接简单了,对吧?呃,我们把这个用升序时间时间出的定义,那是assign asending times,这个简单直接定义一个字段就可以对吧?哎,这个event event time,刚才那个是秒对吧,大家看清楚了吧,跟之前的定义一样,所以乘以1000,然后接下来。呃,大家会想到我要读取这个过程当中要不要K呢?我是所有的所有的数据来了之后都放在一起去做CP的那个那个处理,就是说create后面只要有配就行吗?还是说要根据是不是得根据你同一个ID create后面的配才有效啊对吧?不同订单的事件你就不要来凑热闹了,哎,所以这里边很显然我们应该KBY是不是order ID啊,这是我们先把订单数据读取出来,然后先做一些预先定义的这些处理,对吧?所以这里边是我们的第一步。
17:37
然后第二步,大家还记得按照我们之前CP的那种编程方式,第二步该干什么了?对,是不是定义一个pattern啊,定义一个匹配模式,当然我们所说的这里所说的模式就都是模式序列了,对吧,Order pay pattern。
18:00
呃,这里边大家还记得是pattern点把这个引入啊,pattern.begin对不对?一开始必须要以begin开头,这里面的数据类型当然都是all the event,给一个名字,比方说我们还叫begin,大家想叫start随便改,对吧?Where where什么呢?我们要筛选的条件第一个应该是。是不是必须是那个type要等于create这个才算是开始对不对,哎,我们计时也是从这里边开始的啊even type等于create,那然后接下来下一个,呃,那大家会想到那下一个我们是next还是follow by呢?是严格严格的精灵还是这个非严格的精灵呢?严格。大家想想在这个过程当中,我们这里边是只看到这个create跟配了对吧,那假如说这个订单我是不是还可以有别的操作,比方说修改了一下对不对,改了一下收货地址,或者说追加了一个什么我要什么什么要求,对吧?是不是很很有可能有这种情况,那这种情况后续再出现配是不是也可以啊,我们是不是只要他配了,是不是就就代表在这段时间内它真正完成了呀,所以大家注意啊,中间是不是可以有别的事件插入进来,那这里边我们要用的是一个对followed by。
19:31
这是一个就是所谓的宽松净灵模式,非严格的模式,那这里边follow by还得给一个名字,比方说这里我就叫follow吧,然后where,这个条件要什么?对,是不是还是type,它是必须要等于pay,必须create,创建之后要支付。另外我们还有一个约束条件,对,Within在一段时间内,对吧,这个是一个time。
20:04
呃,我们要15分钟,那就是MINUTES15,所以大家看这就是我们定义好的这个匹配的模式,非常简单,也非常明确,对吧,只要大家就是梳理好这个到底是什么样的条件,然后到底是什么样的关系,每一个个体模式之间,他们组合的关系是什么样的,这个其实很简单就可以定义出来。然后第三步是干什么呢?第三步是不是就要把这个模式应用到我们的那个get STEM上啊,对吧,把模式应用到呃,Stream上,得到一个patternent stream,对吧?大家还记得这个特殊的数据结构,我们定定义叫一个patternent stream。他要调用的是cep下边的pattern方法。
21:00
呃,这里边传的话就直接传,先把stream传进去对吧,Order even stream,然后再传一个order pattern啊,这两个一传进去就搞定了。这里大家注意,接下来就可以做什么了。是不是就可以select了,对吧?呃,调用select方法得的提取。事件序列,那这里大家要要注意的是,我们这里稍微麻烦一点,因为我们更加关心的是,我们当然也可以就是把正常的那个事件也输出对吧,就是正常匹配已经支付过的那些事件也也匹配出来,我们输出一个支付成功,但是我们更关心的其实是超时还没有支付成功的那些事件对不对,但其实我们也并不是说你随便来一个事件,不符合这个模式就完了,我们其实想要的是什么呢?是有开头没结尾的那些,就是begin,是不是这个create已经检测到了,但是到15分钟的时候还没有检测到配的那些事件啊,而我们关心的是这些,所以我们这里可能就要应用一个CP里边所谓的那个超时的机制了,哎,有些同学说我这里边可以用那个note followed by呀。
22:22
但是大家注意,Note followed by是不是不能作为结束的那个模式啊,而且note follow by1一般说的是什么?是说它不在两个事件之间对不对?而并不是说诶,它后边就没有什么事,我们现在要检测的是它后面一定没有什么事件吗?不是,我们要检测的是到这个时间点的时候还不知道对还没来对不对,你正常的话是不是还要等下去啊,其实这个而不是说他后面就没有对吧,这是两回事,所以这里边我们不能用note followed by,而是要用是不是按照定义了这个within之后,按照超时事件把它提取出来啊,所以这里边提取事件序列超时的。
23:10
事件。要呃,做报警处理,报警提示啊,那想到当时我们说这个超时这一部分是用什么方式呢?是不是得用out,呃,对,就是side output,对吧,得用测输出流的方式去获取,那在这个之前是不是先得定义一个output tag呀,所以我们定义一个all time out output tag。你有一个out。呃,这里边它的类型当然是order the event,对不对?呃,当然这里边不是order event啊,是我们定义好的那个all the result对吧?测输出流我们也统一都用这个数据结构来输出,那么给一个它的名称,我们就叫all the timeout。
24:11
有了这个type之后,接下来是不是就可以定义我们的那个result stream了,它要在patternthon stream的基础上,是不是要调一个select方法,这里大家注意,当时我们之前做那个连续登录失败检测的时候,是直接只传入一个select方式,把那个匹配到的序列检测出来就完事了,对不对?现在我们只传一个能行吗?你只传select function式是不是拿到的是符合标准,是不是在15分钟内成功支付的那些事件啊?那如果我们想要处理超时事件的话,是不是你还得传入更多的东西啊,这里边可以给大家看一下,在select里边有不同的实现方法,诶大家看是不是除了你只传一个select方式之外,是不是还可以再传一个什么传一个pattern timeout方式。
25:06
所以再传一个,这个是不是就指定了我们的超时事件放在哪里,然后你怎么样去做处理,对不对?所有的超时事件序列都会被保存在这里的map里边,我们再往后看还有什么呢?那大家会想我当时定义的那个output tag怎么用呢?哎,大家看是不是还可以用这个outut tag去做一个明确的指定啊,对吧?这里边可以是给定这个output tag,然后再传type out方式和select的方式。所以现在我们的实现方法就是。把这几个都要传进来,先传一个all the timeout outputook ta,先把这个ta传进来,然后接下来是不是就要去你有一个自定义的timeout t timeout风扇啊,呃,大家想到了,对吧,Order timeout select。
26:03
这样一个方式,然后接下来是不是还得去用一个,就是我们正常匹配到的对吧,成功支付的那些,再把它选出来order pay select方式啊,这是我们自定义的函数类啊,大家不确定的话,可以点到这里边来看是不是有这里边这样一个调用啊,可以传三个参数,一个是output tag,然后还有一个是自定义的一个t timeout方式。第三个参数是自定义的一个pattern select方式,对吧?啊,大家看这个跟我们前边看到的那种实现方式好像略有不同,那种方式是在哪里定义的呢?是在下边定义的。大家看在这儿这种方式是什么,这里边传的就不是一个。就不是一个我们的那个函数类了,对吧,你就不是去拗一个呃,自己定义的一个函数类了,而是直接在这里写一个自定义的函数对不对啊,那如果用这种方式,大家要注意这个调用方法啊,调用方法这里边用到的是这个函数颗粒化的这种方式对不对?大家看参数是一个一个写的啊,一个一个括号,而前边我们这里边的这种调用是不是相当于这是一个括号里边的三个参数啊啊,这个大家稍微注意一下啊,你调用的时候可能得用不同的方法,我们这里边就用这种还是函数类的方式实现就好。
27:30
然后接下来大家知道在这里边是不是result stream就可以去。直接输出,呃呃,这这里边我们是不是直接输出的话,应该是什么,是不是就是配的这个结果啊,对吧,已经配掉的结果,那如果要是说我想去拿到超时的结果怎么办呢?是不是还要。Get side output,这里要把我们对应的那个output传进去,拿到对应的测输出流的结果,然后再打印出来啊,这里边是timeout的结果。
28:06
这就是我们整个处理的流程,最后还要再执行一下,对吧,All the timeout。
我来说两句