00:00
当前的类已经定义好了之后。接下来我们就可以。定义这样的一个新的。抓把类嘛啊,那当前我们要定义的就是一个。Timeout订单的超时检测。Example。首先我们还是把整个的框架先。写出来。呃,同样,我们要使用当前的CP这一部分内容,先要创建的执行环境。因为。然后不是一般性的把全局的并行度设成一,然后接下来首先第一步我们应该要。获取数据流。当前是一个订单事件流啊,那这里我们可以直接因为。点还是from elements,直接定义一些测试数据就可以了,那这里面要new的是all the event。
01:04
啊,那对应的这些数据我们还是直接参考。文档当中定义好的这些内容,诶我们可以定义还是USER1和USER2,我们可以看到这里user一下了一个订单啊,ORDER1CREATE把它创建出来,然后呢,诶过了一段时间之后,还有把这个订单做了一个修改啊,我们是中间可以做各种各样的操作的啊,把订单下了之后还可以进行修改,然后呢,呃,对应的这个过了一段时间之后又做了一个支付操作,那我们当然是要判断它到底是否在15分钟之内。做了对应的一个订单支付的行为,那如果支付的话,当然它就是没有问题的。那这里USER2的话,下了一个订单二,后面还下了一个订单三,后面是把订单三做了一个支付,但是订单二没有支付,那很显然现在我们就要判断USER2的这个订单二就应该有一个超时的报警信息输出了。
02:05
所以我们现在的需求主要是要做这样一件事情,我们可以把这一部分数据直接先copy过来。那接下来我们得到的这个事件,这个事件流就可以叫做order stream。这是我们首先得到的第一步信息啊,那当然了,后边这里其实应该要做一些其他的转换操作。因为我们这里边涉及到了时间戳嘛,如果我们用事件时间的话,当然是要提取时间戳和auto的,所以还是assign。然后把对应的这个先写出来,我们现在定义的还是按照时间顺序直接放进来的,不涉及到乱序,所以还是点zero,然后接下来with time samer,你有一个zable。这里边我们想要的这个类型啊,应该就是当前的all。
03:03
把它放在这里。然后里边我们需要实现的就是一个这样一个方法。提取的当然就是element。点这里我们给的还是好。那。把这个定义完了之后,当前的这个类型就不应该是。Thatre s了,而是应该是一个啊,对应的这里应该是一个single。Output STEM operator这里要注意啊,我们后面掉了这个之后,已经是一个转换成一个单一输出的硫算这样一个类型啊,尽管他们本质上都是data,但是他俩并不是同样的数据类型。好,有了数据流定义之后,接下来就可以定义我们的模式pattern。那当前的模式是什么呢?其实这个非常简单,就应该是先来一个create事件,对于当前的这个order而言啊,所以后面我们会发现应该是针对order要做一个K,做一个分组啊。
04:11
那这个可以放在后边,就是第三步把模式应用到订单事件流上的时候,我们再去做这个K操作,那现在呢,定义模式当然就是针对当前这个order,首先应该有一个create事件,然后接下来还应该有一个配事件,有这两个事件,然后还要定义一个,他们在15分钟之内要完成。这就是我们呃能够想到的这个模式的基本定义过程,所以接下来首先定义模式的话,t.begin。那这个呢,前面加一个当前的类型参数,All the event。第一个事件,我们直接就把它叫做create好了创建一个订单,然后接下来给一个对应的约束条件,当前个体模式的条件,那很明显这个条件比较简单嘛,Condition里边我们要求就是。
05:10
当前value的event type必须要是。Crate。这就是我们的基本诉求。然后接下来呢,自然就是紧跟着啊,那我们要考虑是不是紧跟着呢,那就比如说这里的订单一USER1的订单一create之后,他插入了其他事件,做了一个moify,这显然是可以的,当前订单已经下单之后,是可以再去追加修改工作的,只要他在规定时间内能够做支付就可以了,诶那所以当前我们并不是严格净邻的关系,而是宽松敬礼,所以接下来并不是,而是followed吧。那follow后面这一个事件,我们把它命名成配支付事件啊,那同样对应的这个VR类型里边给VR条件里边给一个简单条件,我们需要的是value.even type equals。
06:11
配这样一个事件。这就是我们基本的两个个体模式的一个组合,然后接下来还应该有一个限制条件,时间限制,要求在15分钟之内完成。点minutes 15。啊,那对应的这个定义好了,这就是我们想要的一个模式。接下来是第三步。当然就是要将当前的模式。应用到。数据啊,就是订单事件流上。得到一个pattern stream啊,那所以这里调用的时候是CP.pattern里边传入的首先是一个data stream,我们这里还要做一个K,所以all the stream.kby当前的event。
07:05
使用。它的order ID,注意这里不是user ID了,使用它的order ID作为keep进行一个分组,后边把对应的pattern进来。这里得到的是一个。那后边的第四步,当然接下来我们就应该要去检测。匹配到的复杂事件进行提取进行处理了,注意这里边呢,诶,我们关键其实并不是检测匹配到的事件,关键是要检测那些。啊,相当于我们这里边就是只有create没有配,然后就到了这个超时时间的那些部分匹配的时间,所以我们关键是要做这个超时处理,而超时处理的对应的复杂事件的,呃,对应匹配到的事件呢,应该是只有开头没有结尾啊,就是只是有一部分能够匹配上,我们关键处理的是这一部分,那所以接下来呢,我们还需要定义一个测输出流,然后用于输出当前。
08:13
超时匹配数据的对应的处理结果,那所以这里面我们首先定义。一个。测输出流标签。我们可以new一个output,那这里边我们直接给的,呃,既然是测输出流嘛,我们直接输出一个报警信息就好了,直接给一个string类型。呃,这里里边可以给一个当前的ID啊,当前这个标签的ID叫做timeout。当然了,我们是使用这个匿名内部类的定义方式把它定义出来啊,那这里边我们可以把它叫做。Timeout tag。好,先把它定义好。
09:00
接下来的第五步,那就是要把我们完全匹配到的以及超时部分匹配到的那些复杂事件全部提取出来,然后分别进行处理,输出到主流和测试出流,然后最终显示出来,这就是我们想要做的。所以接下来就是。将。完全匹配。和。超时部分。匹配。的复杂事件。提取出来。进行处理。所以当前我们标准的处理方式,那应该是基于pattern stream。去调一个process方法里边,这里边我们要传入的应该就是一个pattern process方式,那这里边我们干脆单独的去做一个定义吧,我们这里把它单独定义成一个自定义的类。
10:03
我们把它叫做order pay match。这样的一个类。接下来我们的关键就是实现这样的一个自定义类。自定义。Pattern。Process。Function。我们现在public static class。Order pay match。首先,我们需要去extend一个pattern。Process。Function。当前pattern process function有两个,一个是输入的数据类型all event。另外一个是当前。完全匹配的符合我们当前模式筛选条件的一组复杂事件处理之后的结果,当前我们也不用专门去呃对应的去做相应的输出,我们干脆就直接输出一个字符串吧,还是输出一条信息就可以了。
11:05
就是哪些订单是正常支付了啊,只要输出一句就可以了,这是一个string,然后另外还应该有一个implement,接下来实现的接口叫做time out。Out。Match handle,我们当前所要实现的这样一个接口,当然了,它也有对应的泛型,就是我们当前的数据类型。Older event。好,接下来我们直接实现里边的抽象方法,一个叫做process match,另外一个叫做process timeout match,一个处理完全匹配的数据,另外一个理超时部分匹配的数据,啊,那这里面就非常的简单了。这里首先我们可以。获取。当前的。支付事件。
12:00
啊,那么这里我们可以直接从match这里,这张map里边去提取对应的。呸。这样一个K对应的,呃,对应的这个事件,那么我们可以知道,当前因为没有循环模式的定义,只有例里边只有一个事件,直接GET0就可以拿到它了。我们可以把它叫做当前的。Pay支付事件。然后接下来我们利用这个支付事件,就可以直接输出一条信息,告诉我们到底是哪一个用户,哪一个订单做了一个支付,哎,那所以这里边我们可以直接out点直接输出啊。当前。用户。那我们应该从pay event里边提取当前的user ID。然后接下来。加上一个订单的ID。
13:02
Order ID。然后接下来我们可以知道这个已支付。这就是我们所输出的一条信息,那同样的,如果说当前我们检测到了一个超时未匹配的数据的话,部分匹配的数据的话啊,那同样我们也可以用类似的方法做一个获取。但是这里需要注意的是,当前就肯定不会获取到对应的配时间,因为我们当前就是部分匹配吗?只有create没有配,所以我们应该获取当前的create事件。同样都是GET0,因为当前没有循环模式列表里边只有一个事件,把它提取出来之后,诶,那接下来我们还应该有对应的一个测输出流标签,因为我们当前只能是进行测输出流的输出,诶这里面没有这个collect out了啊,只有ctx,所以应该是cx.output用这种方式我们需要把测殊流的标签传入,然后把当前要输出的数据写进去,那这里面的这个标签呢?啊,因为已经是在外边去做了一个单独的定义,所以我们还需要把这个再copy过来。
14:19
同样的一个标签。直接copy过来啊,那在这里我们可以把。Out tag进来啊,那接下来我们还是可以。类似的。这样的一句话直接写在这这里面,只不过改成create event。对应的这个订单啊,那这里边超时未支付。这就是我们整个检测报警的一个流程啊。那当然了。这只是实现了当前的。Pattern process function,而且对应的实现了我们超时处理的这个接口,那上边呢,还没有处理完,我们边是通过这样一个。
15:04
这样的一个定义啊,应该要得到一个。最终匹配的结果一个,然后接下来呢,我们可以做一个打印输出。那这个打印输出直接result.print的话,打印出来的其实是正常匹配上,也就是正常支付过的那些订单。所以这是。而如果说我们想要看到超时未支付的那些订单的话,那应该是result点要掉get set output这个方法,那这里我们要传入的其实就是tag。然后接下来。当前是一个timeout。这就是我们完整的处理过程,最后不要忘记V执行起来。这就是我们对于订单超时进行检测的过程,我们可以运行一下,看看输出结果是不是符合预期。
16:06
我们可以看到很明显,用户USER1他的订单ORDER1是正常支付了的,没有问题,中间做了修改,然后做了支付,而USER2的订单ORDER3同样它也是create之后做了支付的。但是USER2的订单二。Create之后并没有支付,所以输出了一句超时未支付。这就是我们使用CP去处理超时事件的过程。
我来说两句