00:00
我们已经了解了在CP当中怎么样去处理超时的事件,那接下来呢,我们来看一个具体的应用场景,那还是针对电商的应用场景,我们来考虑这样一个需求,我们想到一般情况在电商平台里边,最终创造利润的肯定就是用户下单购买的这个环节了,那所以一般情况呢,我们要使用各种方式去促进用户下单购买,那用户下单的行为呢,可以表明当前这个用户他真的是想要买这个商品。但是呢,现实当中就不是每次下单他都会被用户马上去支付。有可能用户拖延一段时间之后,他的支付意愿就降低了,有可能就忘了,诶那这个时候呢,我们就应该要让他有一点紧迫感,要督促他去尽快完成这个支付的环节,所以一般情况在平台里边都会设置一个订单支付的超时时间啊,那另外还有就是说正常情况下啊,下订单之后,那是要锁定库存的,这个过程肯定不能无限的等待下去,那如果他不支付的话,我们这个库存相当于就一直被占用,别人就没有办法去下单购买了,所以总是需要有一个订单超时时间的。
01:14
所以接下来呢,我们就来考察一下怎么量去检测目前我们的海量订单数据里边哪些已经超时了,我们知道一般情况在实际应用的时候呢,这个功能应该是由业务系统去完成的啊,那如果说数据量非常大,如果是海量数据的话,那有时候这个压力其实是非常大的,所以我们可以考虑使用flink去进行一个实时的处理检测啊,那比方说我们现在在这个电商网站里边设置一个15分钟的订单失效时间,那就是说假如说用户下单之后15分钟没有支付的话,那么当前订单就会被取消掉啊,所以接下来我们来考虑一下怎么样处理这样一个场景。那现在我们还是首先来创建一下这个测试的数据啊,因为跟之前我们所创建的这个用户访问事件,因为就又不一样了,我们面对的应该是订单事件啊,那这个订单事件呢,呃,我们可以定义一个order event这样的一个样一类,那里边首先应该有用户的ID,到底是哪个用户,然后接下来呢,呃,他针对订单应该有一个操作,那就是。
02:24
有一个订单的ID order ID,然后接下来他的操作应该有一个类型,就到底是创建了一个订单,是做了一个下单的行为,还是做了一个支付购买的行为啊,那所以我们统一把这个字段定义成even type,那对应的话我们可以有这个create,这就表示是一个下单行为,如果是一个配的话,支付的话,那就表示这是一个最终购买支付的行为。那最后当然还应该有一个time表示事件发生的时间戳。接下来我们自然考虑到了要检测的,那肯定是针对每一个订单,其实现在我们不是针对每个用户的行为检测啊,是针对每一个订单的行为检测,要看同样一个订单是否在create之后15分钟之内有配相应的事件,如果没有的话,诶,那我们就检测出来之后要做一个报警提示了。
03:17
所以在这个过程当中,我们发现啊。这个模式其实是比较容易定义的,那这个pattern就应该是首先begin这里的初始模式定义一个类型应该是create类型的事件,然后接下来呢啊,接下来我们会发现这个不是紧跟着,只要是在后边跟着就可以啊,因为在这个创建订单之后,有可能我们还会对订单进行修改操作啊,有可能做一些其他的操作啊,那这个过程并不影响之后只要有配事件啊,只要做了支付就可以,所以呢,我们这里是一个宽松性邻关系,来一个follow半。那接下来followed by,当然就是一个配时间了,另外我们需要设置一个超时时间,所以是WITHIN15分钟。
04:02
在这个过程当中,我们会发现这个处理的关键啊,我们要检测的事件其实并不是符合我们这些要求的完整匹配上的这些事件,因为完整匹配上的这些事件呢,那就是正常已经支付了的事件了啊,那这个其实我们就是只要知道他正常支付就行了,并不需要去做报警提醒。那什么事件需要做报警提醒呢?就应该是在15分钟之内有了创建订单行为,但是没来得及支付的那些事件,诶,这就是我们说的啊,有开头没结尾,它是一个部分匹配的超时事件。我们重点要检测这个超时事件,去做一个报警提醒。所以很明显,我们现在就可以使用cep里边处理超时事件的这个流程来对这个需求做一个实现。接下来我们就在代码里边具体来考察一下怎么样去实现这个需求。那首先我们还是创建一个scla object,现在我们要做的是一个订单超时的检测,所以我们把它叫做all timeout detect。
05:09
首先没方法写出来,然后我们还是创建基本的执行环境。Get execution environment,然后接下来把它命名成EV,对应的这个影视转换下划线先引入同样我们还是啊全局的并行度设成一,方便进行数据的测试。然后接下来呢,那就需要获取输入事件流了,我们当前的事件流呢,是一个订单事件流,哎,那所以首先我们还是要把这个对应的样例类定义出来,所以定义订单事件样例类。Case class我们这里可以把订单事件直接叫做order event。里边的字段,哎,那首先应该有一个string类型的user ID。然后应该有一个。Order ID,哎,那这个我们方便的话也是一个string类型吧,后边啊,那还应该有一个event type,就是类型,到底是创建了一个订单还是支付了一个订单,它也是string字段,最后还有一个time step,长整型的时间戳,这就是我们基本的定义,所以接下来就可以。
06:15
读取事件了。读取。数据源。因为哎,那这个我们还是直接from element啊,把测试的数据直接copy过来啊,那现在我们这个测试数据可以大概的看一下啊,同样还是有USER1和USER22个用户去做了对应的订单行为,然后我们看到user一下了ORDER11个订单,然后呢,下单之后做了一个更改,而且还做了一个支付,而USER2呢,他下了ORDER2和ORDER32个订单,其中ORDER3做了一个支付,ORDER2并没有支付成功啊,那所以我们看到后边有对应的这个时间戳,在这个时间戳里边,通过这里的这一个乘法的计算看的很明显啊,乘以1000这表示秒,再乘以60,这表示分钟,哎,所以这个是十分钟做了一个create 20分钟做了一个支付,那显然是在15分钟之内的啊,那前面这个AL1呢,第一秒做了一个create,然后第一分钟就直接做了一个支付,当然也是没有超时。
07:21
时的,所以这两个订单是成功支付的订单,而ORDER2呢,创建之后没有支付,当然就是会是一个超时的订单,接下来我们把这一部分数据先复制过来。那得到的这个事件流,我们可以把它叫做order event stream。接下来我们考虑到因为当前的操作跟时间有关,诶,那所以这里边我们既然是事件时间语义,那一定是要定义好怎么样从数据里边去提取时间戳,然后去生成水位线啊,那所以这里边我们直接assign,当前都是生序的啊,所以还是简单起见,Asign asending。
08:03
直接指定时间戳字段是time。三、接下来还需要做一个基本的操作,那就是。按照order ID做一个分组啊,因为后边我们肯定是针对每一个order去检测它的create和配饰件,注意这里是针对order ID,并不是针对用户ID啊,那用户他有可能下很多个不同的订单,所以我们这里给定的K值是order ID。接下来就是第二步,那就是要去定义检测的模式,定义一个pattern啊,那其实我们已经知道。这个pattern的定义。啊,先把这个pattern本身要做一个引入pattern调用它的必定方法,诶,那这里边需要给一个泛型参数啊,我们当前处理的事件当然就都是order event了,里边呢,直接给一个字符串类型的name,我们就把它叫做create吧。第一个事件就是创建订单的一个事件,那后边需要给一个对应的条件,我们这里就是一个简单条件where,判断它的even type是否等于create。
09:09
这是第一个事件,然后接下来啊,我们说只要后边跟着一个支付事件就可以啊,中间我们可以比方说这里啊,ORDER1可以有一个moify,这个是完全没有问题的,所以接下来呢,是一个宽松径离的组合条件,那就是followed by。后面给一个name,比方说这个就叫支付pay啊,那当然了,Where条件对应的even type也就应该是pay。这就是我们定义的两个事件先后发生,另外还需要得有一个限定的时间条件,那就是在当前的判断基础上指定一个within,啊,Within这里边我们看到需要传的是一个window time,所以它本质上啊,这里边使用的其实就是我们在窗口里边定义的那样一个类似的一个时间窗口。所以呢,里边我们所要引入的当然就是time,诶这个time类要引入的也就是flink stream API window in time.time哎,这个要引入的就是窗口里边的time类型,我们现在是15分钟,那就是MINUTES15。
10:14
这样的话模式就已经定义好了,然后接下来我们就可以第三步操作。那就是将。模式。pattern应用到。事件流上。啊,那我们之前的这个事件流已经做过KBY了,那就不用再做进一步操作了啊,就把order stream直接和这个pattern做一个结合就可以了,那我们这里调用的是cep。要把这个引入CP.pattern。传入order event以及pattern。得到结果,我们可以把它叫做pattern string。那么这个p string接下来就要进行事件的检测和处理了,这里边我们要检测的事件呢,呃,可以去检测到当前完全匹配的成功支付的事件,也可以去检测部分匹配的超时事件,哎,那所以这里面我们想要去做的是检测。
11:16
匹配事件和。部分匹配。的超时事件。那接下来我们就可以基于pattern STEM去调用一个,哎,最标准的啊,官方推荐的做法,那就是直接调用一个process方法里边,哎,那我们这里边就需要对应的啊,实现一个pattern process方式了,这个pattern process function呢,我们本身还需要去实现一个接口,就是所谓的timeout partial match handle这样一个接口,哎,所以这里边我们干脆单独定义吧,我们给它一个名字,这个就叫。Order pay这样一个检测。我们把它单独的定义出来。因为我们当前得到的结果应该都是正常支付的事件啊,那所以这里边我们可以把结果叫做。
12:07
配的order stream对应的剩下的还有一条流,应该就是测输出流,要捕获的是部分匹配的那些超时事件,哎,那对于这条流呢,我们就应该调用这个data stream的get set out put方法,里边要传入一个当前测试出流的标签啊,我们这里可以直接new一个output ta。这里传入一个对应测输出流里边数据的类型,哎,那这里面我们直接是STEM就好了,直接输出一个对应的报警信息嘛,那然后里边需要有对应的一个ID,就叫做timeout。那么得到这个测输出流之后,就可以直接做一个打印输出了。这条流我们给一个标记,叫做胎帽子。啊,那对应的本身paid order STEM,它也可以自己做一个打印输出,这本身是已经支付成功的,对应的所有的匹配事件。
13:02
啊,那最后完整的处理流程,因为要执行起来。这就是我们整个的处理流程,接下来的关键核心就是要实现order pay detect这样一个自定义的函数类了。所以接下来class。Order pay啊,那接下来extend,我们要实现的是一个pattern process function。那它有两个泛型参数,一个input,一个output,我们现在真正要输入的数据当然还是order event,那它的输出呢?呃,其实这里边我们主流的输出也就是检测到匹配的这个事件呢,处理的结果可以跟测输出流的报警信息类型不一样,那这里简单起见,我们还是直接用string做一个输出吧,就输出一条消息啊,提示一下当前已经成功支付了,这样就可以,这还没完,接下来呢,我们要处理超时事件,必须还要实现一个接口,那就是with。Time的out partial match handle了啊,那我们看到本身它也需要有一个泛型,那它的泛型是什么?当然就是应付的啊,传入进来的,我们当前处理的输入数据的类型。
14:12
里边必须要实现一个process timeout match方法。那这里我们给一个。Order event。里边现在我们必须要实现的抽象方法就有两个了,一个叫做process match,这是要处理正常支付的匹配的事件,那另外还有一个叫做process time out match,那就是部分匹配的超时事件,那他们匹配到的数据肯定都是放在这个map里的啊。所以接下来我们先来处理这个正常匹配的事件吧,我们可以注释一下,现在是处理正常支付的匹配事件。所以这个时候我们可以直接把当前已经支付了的这个事件拿出来,叫做pay event啊,那我们直接就可以从这个map里边去获取一下,那我们可以直接get对应的K是什么呢?哎,那现在既然是一个完整的正常匹配嘛,那当然这里的create和配两个个体模式对应的匹配事件就都是能够捕获到的,那所以我们直接get配这个字段啊,配这个K。
15:19
就可以捕获到对应的支付事件,那这里边当然拿到支付事件还是放到了一个list里边,只不过我们现在就只有一个值嘛,所以GET0就可以了,我们没有定义循环模式。接下来呢,哎,那就可以直接做一个输出了,现在要调用的是collector.collect方法做一个主流的信息输出,哎,那这里我们可以直接做一个模板字符串的输出。现在输出一个订单,后面加上订单的ID啊,那我们这里可以用pay.order ID跟在后边,那接下来我们可以说它以成功支付。这就是一个提示信息,非常简单的一句提示信息,然后接下来呢,呃,我们还得处理超时事件。
16:06
处理。部分匹配的超时事件,那同样这里边我们可以定义想要捕获的一个事件,注意这个事件呢,如果再去捕获pay,当然就捕获不到了,因为我们当前部分匹配吗?那很显然当前我们只有两个事件,肯定就是只来了一个create事件,没有等到对应的配事件,所以我们不能去获取配相应的那个值,如果直接获取的话,得到的肯定就是一个nu,所以我们应该获取的是一个create事件。我们把它叫做create event啊,那基于这个map去get的对应的K也是create。同样当前例子里边只有一个值,我们GET0就可以把它捕获出来,接下来呢,哎,那同样我们可以输出一条信息。只不过这里需要注意的是,现在不是用collector去输出了,而是要使用上下文。context里边有一个方法叫做output。
17:06
测输出流的奥的方法呢,对应的应该传两个参数,一个是测输出流对应的那个标签,另外还有一个就是要输出的数据啊,所以当前我们这个标签呢,呃,因为这个函数类我们放在外边了啊,所以不能直接使用我们在呃之前没方法里边定义的对应的变量啊,那所以我们就单独的把这个再COPY1份啊,因为我们知道ID和类型就可以唯一的确定当前的输出标签,哎,所以我们直接把这个复制一份放在这里。这样的话,对应的特殊出流就已经指定好了啊,那当然了,这里边我们不应该用pay event,而是使用create event获取一下它的order ID,另外里边的输出信息我们也应该做一个更改,那就是它是超时。未支付。哎,那对应的这个用户,我们也可以输出一下用户为当前的用户,那就应该是create event里边的user ID,把这个拿出来就可以了,这就是我们检测部分匹配的超时事件,然后进行报警信息输出的一个过程啊,那当然了,接下来得到的这个数据流啊,我们一部分这个主流里边的数据,这是正常支付的匹配事件,那输出到了配这个打印结果里边,那对应的这个超时事件呢,我们是通过获取测输出流,然后打印到了开out对应标签的这个流里面。
18:30
接下来我们可以直接运行,看一看测试的结果到底怎么样。跑起来之后,诶,我们可以看到输出的结果跟我们预想的是完全一致的,订单一ORDER1和ORDER3已经成功支付,他们都是在15分钟内支付成功了,而订单二呢,有create事件,没有支付事件,所以它是超时未支付,而且明确的报警提出了是USER2,这个用户可能会有问题啊,那所以这就起到了对于特定事件超时事件的一个检测报警的功能。
我来说两句