00:00
现在已经首先读取数据转换成了想要的样例类类型,而且也已经批分组了,那接下来是不是就是CP的处理流程,首先先来定义一个模式,对一个pattern,然后这个模式里边大家要注意,肯定是需要有一个时间限制的,对吧?那最后我们是要有一个within的,好,那这里边我定义一个这个all the pay pattern,当前我们定义pattern的时候,引入CP里边的这个pattern,对吧?然后大家还记得定义事件序列的时候,必须首先是要,诶大家看这里边就没别的方法对吧?哎,就必须这里边只能是调这样的一个一定,然后呢,这里边它必须还要给这个当前的数据类型,我们当前是什么类型,就是order event这一类嘛,对吧?啊,然后里边呢,还必须得给一个当前的名称,一个name,当前个体模式的name,诶我们一开始这个就叫做什么,就叫做create不就完了吗?对吧,我当前要的就是create吗?
01:00
我直接换行啊,把它写在这儿,Begin create,哎,那你定义它叫create,这个没用,关键你还得选出是create那个行为的对应的数据,所以接下来我们需要有一个where操作,Where里边,哎,那我们用一个提取器,要的是当前的even的type,必须要等于create,那然后接下来大家就知道后边接下来需要有pay,对吧?哎,那这里边就涉及到一个我们当前的模式序列,是用这个严格禁灵呢,还是宽松禁灵呢?哦,大家想到诶你这里边如果说这个下了订单操作嘛,下了订单之后是不是订单还还可以改啊,啊对吧,有可能还可以有其他的一些操作,然后我再去支付,这个完全是有可能的,所以这里边我不需要一定要create之后马上就下一个,一定要是配啊,所以这里边我们宽松精灵,宽松精灵用follow by对吧?啊,那这里边给一个名称,比如。
02:00
常说这个就叫pay,对吧?接下来这个操作叫做pay,那同样我也需要给一个定义,当前的even type必须得是pay对吧?这里边我并不需要去指定那个ID,因为我们已经做了分组了,就只针对当前ID有效,对吧?后面做的所有操作都只针对当前的U,呃,这个al ID,这样就把它定义好了,后边大家不要忘记还有一个在多长时间内对吧?15分钟内匹配上这个才是有效的,所以我们定义一个类似于窗口的时间window,这个还是用这个window in time.time然后定义一个15分钟,这就是完整的pattern的定理,然后接下来是不是要把这个pattern应用到这个流上面去,对吧?将pattern应用到呃数据流上进行模式检测,或者说进行模式匹配对吧?
03:00
啊,那这一步也非常简单,我们得到的是一个啊pattern,就是一个pattern stream对吧?那这个pattern stream就要调用cep,我们用的是这个flink cp scale.cp啊这个object办成对象,然后调它的pattern方法里边呢,就把我前面前面定义好的这个order the stream流传进来,后边把我们定义好的order pay pattern传进来,对吧?诶这这样就匹配上了,非常简单,然后接下来第三步,诶第三步稍微又又不一样一点,之前我们第三步就直接就s select提取出来就完事了,对吧,现在是不是就是当然我们也可以关注这个主流匹配上的这个提取出来就是,呃,先做了create,后面又支付之后的这个正常支付的订单啊呃,检测出来输出这个,这也是可以的,但其实我们最关键最关心的是什么呢?是。
04:00
时报警的那个对吧?那其实我们最关心的是超时的事件,所以这里边我们涉及到一个超时事件的提取,那当时是不是说到是需要用到一个测试枢流啊,然后定义一个patternon timeout function去处理这个事儿,对吧?那特殊输流提取的时候就得定义一个测殊输流标签,哎,所以为了方便我后续使用,我先定义出来一个测输出流标签,这个就多了一步测输出流标签用于呃这个处理超时事件,所以接下来我们定义一个这个就叫做order timeout output tag,尽管有点长,但是大家看这个定义啊,这个语义就非常明确,对吧?呃,你有一个output tag里边的类型,哎,那家想到这个output tag,最后我们肯定还是out result,对。
05:00
是吧,我们都统一包装成一个result嘛,然后里边要给一个当前的名称,哎,我这里还是给一个all the timeout好直接放在这儿,然后接下来的第四步才是啊,就是用一个select方法对吧?调用select方法对呃,提取并处理匹配的成功支付事件,支付事件以及啊,超时的对吧,超时事件,哎,这就所有的处理都在这儿了啊啊,那接下来我们进行一个result stream,基于之前的pattern stream去做处理,然后这里边调select的方法,大家还记得那个select方法有很多重载的方式,对吧?呃,之前我们第一次点进来的时候,呃,大家会看到最简单的方。
06:00
这其实就是直接传一个pattern select function,或者你也可以把这个写成匿名函数,这都是可以的,这是我们之前已经做过的这种操作啊,你假如对超市不关心的话,那就做这个操作,那另外呢,还可以有更加复杂的传餐方式,可以怎么传传呢?诶大家看这里边我可以直接传两个参数,第一个就是一个pattern timeout的方式,后面就是一个select方式,对吧?大家说诶这里面没有定义out,那那这个是怎么定义的呢?大家看是不是下边它自己去随机生成了一个outut tab呀,对吧?就相当于随机生成,然后去做这个处理的啊,但是你这里面如果随机生成的话,你后边要想去单独把它提取出来就有点麻烦,对吧?啊,所以我们往往是用什么操作呢?大家看传三个参数的这种方式啊,就是我可以怎么样,前边给一个auto ta把这个放在这儿第一个参数,后边呢,再给两个参数,分别是patternon timeout function,用于处理超时事件,还有一个。
07:00
Pattern select方式用于处理成功匹配的事件,对吧?啊,就是这样去区分的,当然后边还有其他的一些,大家看到还有其他的一些传参方式,是对应我们这个直接传一个匿名函数的这种方式啊下面这种传参方式就是什么?就是我们前面看到的,可以给这个大家看到,这是这个呃颗里化的这种传参方式了,对吧?前面这里边呃,前面一个括号,这里边一个patternon拍out的方式,一个匿名函数,后面又一个括号patternon s select的方式,对吧?来这是没传那个output tag,如果传output tag的话,是不是就变成了三个参数了,大家看就这里边这个啊output tag放在这儿也是可理化,后边第二个参数拍on time out的方式,后边是patternon select方式啊,所以这两种方式大家都可以去调用啊,但但是调用的时候你到底要不要颗比化,要不要是啊,直接在一个括号里边传三个参数,你就得看里边到底实现的是拉姆达表达式还是呃,就是一个匿名函数。
08:00
还是我们实现的那个函数类对吧?啊,大家可能比较熟悉的啊,就觉得比较舒服的可能还是这个函数类的实现方式,所以我们接下来还是给大家讲这个函数类的事件啊,首先第一个参数,那就是传三个参数参数了,第一个把这个output tag传进去对吧?啊,然后接下来第二个那要去自己拗一个pattern timeout方式,对吧,我把这个叫做order timeout select自自定义嘛,所以直接给一个名放在这,然后下面还需要有一个pattern select方式,我把这个叫order pay select对吧,已经提取到的话,就定义这么两个就可以了,然后接下来。就是我们这个提取完了之后,最后打印的话,大家想到result STEM,直接这个主流打印得到的应该是什么呢?这应该是成功支付的这个所有的订单对吧?那这这个result啊,然后如果要是说我想得到那个真正的超时报警的啊,那些结果的话,那应该也怎么样,还得get set output对吧?哎,这里边我们要传的就是之前定义好的那个all the timeout output t啊,然后呃,这里边直接get啊,然后接下来再去print把它打印出来,当前这个才是真正的timeout,哎,这就是最后我们想看到的这个结果,哎,那这里边最后不要忘记还要把它执行起来,当前是all the time out job,把这个定义出来好,所以最后我们要实现的就是实现自定义的pattern timeout function。
09:44
哎,当然了,以及我们之前说的pattern select function对吧?这两个都要去做一个实现,好呃,那接下来我们class把这个之前我们定义好的这个东西timeout啊,呃,Timeout select啊,做一个实现,它需要去实现的东西是pattern timeout,诶大家看就是这个啊,Timeout function对吧,一个接口,然后里边同样还是一个音符的一个output,就像一个map map function一样啊,这里边我们的输入是什么?当然是输入样一类了,Order event嘛,然后输出是什么,Order result对吧?哎,当然是那个结果了,所以大家看到现在我们本来这个输出它应该是什么呢?应该是测输出流的那个输出类型对吧?只不过这里边我们测输出流跟主流的类型是不是一样了,都包成result了,对吧?所以你看起来好像都是一样,你如果想改成不一样的也可以对吧,我这里叫做也是可以的。
10:44
接下来看一下具体实现的就是一个叫做timeout的方法,里边传的参数是什么呢?啊,这就是我们说的首先检测到的这个视线是不是还是放在这个map里啊,但是大家要注意这个map里边是什么呢?就是有开头没结尾对吧?对于我们这个例子是不是肯定是有create,但是没有配啊,哎,所以你不要再去get那个配的那个值对吧?那肯定拿不到,肯定是空的,肯定是闹啊,所以这个大家需要注意一下。另外还可以拿到一个timeout stamp,对吧?呃,Time time stamp就是说超时的那个时间戳,当前到底是到几点钟的时候,到什么时间,你现在太out了,那就是我们定义的那个15分钟之后吧,啊,大家如果想把这个打印出来的话,也可以打印出来啊,所以接下来我们直接输出一个一个信息啊,首先我要拿到当前这个ID,我最后要的不就是一个order ID吗?Timeout order ID把这个拿到大家想。
11:44
等一下这这东西怎么拿呢?从pattern里边这个map里边拿对吧,Get get什么create对吧?注意千万不要去get pay,因为没有对吧,里面没这个数啊,然后接下来我们啊,那当然就是说你可以去用它的那个迭代器,然后因为知道里边就一个数嘛,只有一个对不对,我们没有定义循环模式,所以就一个数get拿到,呃,就是next把它读出来就完事了,我们要的是它里边的order ID啊,那接下来包装成一个order result,我们现在要的是呃,这个现在首先把这个timeout order ID传进来啊,后边给一个timeout报警信息对吧?呃,现在我们这个给一个timeout,比方说我们当前就是这个不用空格啊,因为前面已经有这个ID了,Timeout,然后加上我们当前的这个时间啊我呃,我们定义这个当前的时间,大家想到是不是直接把这个timeout time Sam传进来就完事了?
12:44
对吧?诶,直接调这个图斯讯方法,直接把它追加到后边,看到到底是什么时间超时就可以了,这就是一个报警的一个操作啊,当然如果说在这里边你想要去直接改数业务数据库的话,那你连接业务数据库对吧?查询对应的这个order ID,然后把对应的那个字段改了就完事儿,哎,这是这个超时的处理,然后另外还有一个呃,正常的那个处理啊,Pay select的那个处理,在这里边我们定义的是一个pattern select function下面这个interface对吧?同样我们还是一个all the event,是样例类输入all the result,样例类输出里边要实现的这个是不是差不多啊,就是一个select的方法,现在就没有那个时间戳了,就只有当前的那个数据,对吧?都保存在当前的这个,呃,就是这个map里边啊,那我们现在需要的不就是一个order ID吗?哎,那我这个叫做paid的order ID,诶那这个你就随便取了,大家知道这个map里边。
13:44
是不是所有的不是这个啊,是这个patternon啊,从patternon里边所有的东西你直接去拿的话,Print和pay是不是里边应该都有数啊,呃,你拿哪个都行,所以这里边我可以去拿配,然后我用这个迭代器拿出来之后,Next把它读取出来,然后点order ID取出来,最后包装成这个样例类里边这个就非常简单了,ID后边我直接说它成功支付就完事了,对吧?啊,Paid的successfully。
14:20
哎,所以这个其实是非常简单的一个实现的过程,我们现在可以运行一下,看看这个执行的结果是什么样的,来看到现在已经运行结束,我们输出结果了,然后在这个里边,诶,大家看到有有这么几个,有这么几个这个输出timeout的这个场景,对吧?诶我们检测一下啊,大部分都是paid successfully,我们现在看一下34767,这里边报了一个timeout,那我们要查一下啊,34767到底是什么,看一眼哦,34767大家看create的时候是0949对吧?诶所以大家看到这里边它报警的时候是什么时候呢?1849000后面多了三个零,大家知道是毫秒数嘛,呃,多加了三个零,那0949~1849加了900秒,900秒是多少?900秒有15分钟嘛,对吧,哎,15乘以60就是900秒,所以这里边它的超时时间就是创建这个。
15:21
订单的啊,Create时间事件时间,大家看到啊,他自己的那个时间戳对吧,然后加上15分钟之后的那个时间,他超时了,哎,所以这里边报了他超时,那我们看一下下面有没有啊,诶其实有它的配的,但是我们仔细看的话,会发现这里边配的时间是不是已经是2021了,对吧?啊你你尽管是支付了,但是超过了这个时间,它其实正常应该就是只要达到1849这个点,是不是应该就直接报超时了,对吧?你所以你后边这里边2021再来,其实这个就已经没用了,这数据就已经可以就直接不做判断了,对吧,直接匹配不上了就丢了啊,所以这是大家能够看到的最后检测的一个结果啊哦,下面这里还有一条大家看,还有一个这个34756,我们也来校验一下啊,34756,大家看create是0913,哎,所以它的这个定时器触发的这个时间是什么?1813对吧,900毫秒之后,然后我们再看一眼哦,它它也有支付。
16:21
它的支付是1951,是不是超过这个时间点了啊,所以这里边就支付时败对吧?呃,这就是超时检测出来是一个超时的支付时间啊,那当然大家也可以就是来一来一些这个数据是没有配的这个数据,那其实最后也是因为它,它跟那个配不配没关系,对吧,只要到了watermark涨到这个时间点,它就会直接触发这个输出这个呃,超时事件的这个检测啊,啊,所以这就是整个来讲就是这样的一个过程,大家可以想到这个具体执行的过程,它会怎么样呢?我们这数据不停的来automark不停的前进,那其实是只要在这个过程当中检测到有同一个这个order ID啊,它的这个create之后来了配了,它是不是就应该实时的输出一个一个success successful里对吧,检测到输出,然后如果说要是这个create来了之后,一直等一直没来,然后先等到了15分钟之后定时定时器触发,那是不是它就直接输出这个开out tout了,对吧,你后面如。
17:21
如果再来配的事件的话,也就没有用了,这就是整个的一个呃,最后运行测试的结果。
我来说两句