00:00
我们已经了解了,在果其实一般情况只有两种啊,那就是要么完整的匹配上,要么不匹配,那整个这个过程具体来看应该是什么样子的呢?我们可以考察一下之前定义的。连续三次登陆失败,这样一个例子里边,我们定义的这个模式里边有三个个体模式啊,假如说现在我们来了具体的数据之后。当前的flink代码到底应该怎么去处理当前的事件呢?诶,那首先就是判断,假如说我们是初始状态的话,那首先来了一个事件,判断当前是否符合begin这里定义的first。个体模式对应的这个事件符合不符合它的筛选条件啊,那假如说当前直接就这个类型不是废物,那当前就不符合直接丢弃对应的时间。
01:03
那假如说当前事件符合模式匹配的这个条件的话,发现它的类型确实是费用,诶,那当前我们就应该接受这个事件,但是注意现在并没有完整的匹配出来一个复杂事件,而只是匹配出了第一部第一个个体模式。所以呢,我们应该把这个事件接收下来,然后保存起来,保存的数据类型当然就是我们这里对应的这个map数据结构了。然后接下来如果在这个模式序列定义的过程当中,后边还有其他事件的话,诶,那接下来我们当然就是继续读取数据流接下来的事件,然后按照这个顺序。匹配接下来的第二个个体模式。那当然了,这里边匹配的话有两个条件,一个就是要看当前的这个对对应的这个个体模式的条件是否符合,筛选条件是否满足,那另外呢,就是两者之间的连接条件,竞邻关系是否满足。
02:09
那假如说接下来我们这用到的一个近邻关系是严格近邻的话,那很明显。接下来来了又一个新的数据,我们马上就可以判断。而既然他是严格禁灵吧,那接下来就是要不是匹配,要不是不匹配。如果当前。继续匹配的话,那这个很很显然跟前面的处理过程是一样的,就接收接下来的这个费用事件,然后同样把它也保存起来,也缓存到对应的这个map里面去,那当前保存的K呢,就变成了second second啊。第二次登陆失败啊,那接下来再再去检测第三次接下来的事件,那假如说当前就已经不匹配了呢。当前已经不是费用了,那那会怎么样呢?那就直接丢弃掉当前的事件,而且注意当前如果我们是严格经营关系的话,一旦这个事件不匹配,相当于把我们整个模式序列的逻辑已经确定它是不匹配的了,已经打了。
03:17
那么接下来我们就可以直接把当前所有已经保存的事件全部丢弃掉,重新开始检测。这就是这样的一个完整的过程。那假如说我们这里是followed by是。宽松净灵模式的话,那就相当于假如说我们发现紧跟着接下来这个事件不匹配的话,只是把当前这个事件丢弃,但是呢,整个我们这个模式序列还是可以继续去匹配的,那接下来继续再看后续的事件到底是否符合。那只有发现了当前的模式序列,整个这个逻辑被打乱,不再满足要求了,那这个时候我们才把从头到尾所有已经匹配到的事件全部丢弃掉。
04:07
啊,那这里面有一个特殊的情况,那就是如果我们前面说说到啊,整个这个模式序列里面可以加一个within的时间限制。如果我们这里面加上了within时间限制,又该怎么样呢?哎,我们可以看到这里边可以传入一个time,比方说我们这里边传入一个TIME10秒钟。注意这里的time本身within,这里啊要传入的time,我们看到它就是flink streaming API window time.time啊,所以这里边本质上跟我们之前在定义时间窗口时候传入的那个时间是一样的啊,那本质上我们就会发现,呃,可能这里边就跟开了一个时间窗口一样。那假如说我们当前有这样一个限制条件的话,又会出现什么情况呢?诶,那就会涉及到我们当前可能所有的事件之前啊都是匹配的,都没有破坏我们整个的模式序列,但是呢,接下来到时间了。
05:09
已经超过了我们定义的这个时间间隔了。那接下来就相当于我们要把所有的事件都丢弃掉,重新开始进行检测匹配啊,但是在这种场景下我们会发现啊,它跟我们严格意义上的不匹配还不太一样,它本质上其实已经部分成功匹配了,只是超时了,所以这种情况在有一些场景下是要特殊处理,特殊对待的。所以接下来我们就专门的来介绍一下flink cep怎么样去处理超时事件。那这个超时事件的处理呢,其实也比较简单,前面我们在看这个源码当中API的调用的时候,其实也已经发现了,我们看到这里select方法。
06:00
其实是有其他的重载实现的,我们除了直接传入一个pattern select function之外,还可以。多传入一个pattern timeout function啊,那当然了,前面还有对应的一个参数叫output time,很明显这里边我们就是要针对超时的数据进行处理,可能需要用到测殊输流了啊,那明显这是一个特殊物流标签。所以我们可以使用这种方式传入两个对应的处理的方式,一个叫做pattern select方式,另外一个叫pattern timeout function,那顾名思义,Pattern select function处理的是正常匹配到的事件,那pattern out方式呢,处理的就是。超时的部分匹配的那些事件,哎,所以这里边同样可以检测到一些匹配的事件,只不过呢,诶,我们当前还没有完整的匹配出来,就已经超时了啊,那对于这种数据,我们可能单独把它做一个处理。
07:09
那对于这个s select而言,我们知道他最后得到的data stream啊,那应该是我们正常匹配数据select function处理之后得到的结果,那对于这个超时匹配的处理结果应该输出到哪里呢?那就用一个测殊输流,指定测输流标签,然后进行一个对应的输出。我们可以看一个具体的事例。我们可以看到,当前的话,首先要先定义一个特殊路由标签。用于。表示当前的超时输出,进行超时数据处理之后得到的那个特殊出流,然后接下来呢,我们这里处理的时候,P stream直接调用点select方法传入当前的。超时的这个特殊物流标签,这个tag,然后接下来要传入两个对应的function的实例,一个是pattern timeout。
08:11
另外是一个pattern select方式,那这里的这个pattern timeout方式呢,里边关键是有一个叫做timeout的方法,我们需要去单独实现,同样它这里边也有一个map,这里边保存了,我们这里边不是超时是一个部分匹配吗?所以当前已经匹配上的那些事件都保存在了这个map里边。这里需要注意就是我们里边只能获取到当前已经匹配好的那些数据,哎,那没有匹配上的那些数据,这里是不能直接去get的,所以get到的话有可能是一个空的,那这里面我们可能要注意它抛出异常的这种情况。那另外呢,这里还有一个参数叫做timeout time step,就是超时的时间戳,当前到底是什么时候?
09:02
达到了这样一个超时的时间点啊。那这种调用方式其实是。Flink cep在早期的版本当中用来捕捉超时事件进行处理的一个接口啊,那呃,除了select可以这样去调用啊,那当然了,就是后边如果我们要去补货对应的这个数据处理出来的结果数据的话,那需要去调用当前流的get set output获取测输出流,这个方法对吧?把对应的tag标签传进去捕获测出流,然后接下来可以打印输出了。那除了select可以这么去调用的话,我们会发现在源码当中select select也可以用这种方式去调用啊,那只不过这里边传入的三个参数,首先第一个是output这个一样,那后边两个参数就是一个叫做pattern flatout function,另外一个叫做pattern flat select方式。
10:01
后面这个select function处理的是正常匹配的数据,而前面这个out function理的是超时部分匹配的数据啊,那同样对应的在这个interface这个接口里边要实现一个胎的方法,同样前面有一个map类型,保存当前部分匹配上的那些所有的事件啊,那后面呢,还有一个当前超时的时间戳,对应的还有一个collect,我们想要输出数据的时候用的是out.collect当然了,这里边输出的其实是输出到了。特殊出流啊,这里一定要注意一下,输出到特殊出流后面,还得使用我们得到的那个主流里边去调用它的get set out的方法捕获对应的。这是早期老版本里边的使用方式,现在01:13里边依然还保留了,但是呢,呃,当前这个官方官网上推荐的。
11:02
捕获处理超时事件的方法呢?其实是使用。Process啊,就是当前应该使用pattern process function来定义它的一个测试,然后去进行处理,那对应的这个处理方式呢,稍微有点不同,因为我们在源码里边可以看到。Pattern pattern string里边。Process的调用就比较特殊,它另外多参数的话,只有这个对应太information啊,单独指定当前的这个类型,只有这种参数,并没有处理超时事件的参数,诶,那这样的话,这process调用之后怎么样去处理对应的超时事件呢?我们当前的处理方法是。实现另外一个专门用来捕捉超时的部分匹配事件的接口叫做。Time out partial match handle啊,就是超时了之后的部分匹配的一个事件的处理。
12:04
那这个接口里边呢,对应的会有一个叫做process timeout match的方法,那这里面我们可以看到对应的啊,我们这里实现的这个。Pattern process function,就是extend pattern process function,另外呢,Implement这样一个接口,然后接下来要实现的就是一个process match方法,这是正常匹配件的处理,另外要实现的是一个process out方法,这是超时的部分匹配的那些事件的处理啊,那同样这里边我们会看到这个处理的过程当中有一个map保存了捕捉到的部分匹配的那些对应的事件,那另外还有一个。上下文啊,这个上下文里边同样可以去调用对应的凹的方法进行测输出流的输出,这样的话,这种方式跟我们之前process的那种使用就非常的类似,而且会更加的简洁一些,所以这也是当前官网上推荐的用法。
13:09
那接下来我们还是来介绍一个具体的案例,我们看一看怎么样去处理超时事件。我们首先想象一下这样的一个场景,就是在电商平台当中,我们知道最终获取能够创造利润的环节啊,是最终这个用户下单了之后要购买这个环节,所以呢,用户下单和支付的行为对于电商平台来讲是非常非常重要。啊,但是我们知道啊,用户下单并不代表他都会支付,那所以说一般情况在电商平台里边,包括任何有付款行为的,呃,有下单付款行为的这样的一个系统里边,往往都会设置一个超时时间。就如果说我们不设置超超时时间的话,那隔了一段时间之后,哎,那用户可能就忘了,呃,就是它的支付意愿会降低,那或者说如果我们跟根本就不做超时限制的话,那用户有可能就把这个下订单当成一个收藏夹了啊,那就直接我先不管怎么样啊,不管三七二十一看看到这个商品不错,我就先下了单再说,那这样的话就会有一个问题,就是当前这个商品我们应该被下单之后,就应该要在库存当中减去。
14:28
那如果说我们当前用户这边减去了,但是一直没有支付的话,那相当于其他用户就没有办法再去获取这个商品了,那这样显然是不合理的,无论从经济效益上,呃,就是促进用户,给用户一个急迫感,让他赶快去支付,让给平台创收,无论从这个角度来看,还是从整个这个业务逻辑的流畅来看,我们都应该设置一个订单的失效时间啊,那所以电商网站呢,往往会对订单的状态进行一个监控,设置一个比方说15分钟,如果未支付的话,就直接。
15:08
撤掉订单啊。往往会有这样的一个限制,那当然了,在具体的应用过程当中,类似的需求我们可能往往是在整个的业务后台里边直接去做处理的啊,那或者如果数据量非常大的话,有可能我们会把它放在里边,保存当前的这个订单状态,然后设置一个失效时间,这都是常规的一些应对方式。那接下来呢,我们考虑的就是,假如说数据量非常非常的大,我们能不能用flink把对应的这样一个需求实现呢?当然是可以的,我们就可以使用cep来检测当前用户的下单,之后又进行支付这样的一个组合行为,也就是复杂事件,对它进行一个处理,而且当前的复杂事件呢,应该还有一个时效,也就是微Z1段时间,这就是我们的超时时间,而且现在我们关键要处理的是什么呢?其实是。
16:08
超时的那种那些订单,我们要进行一个比方说做一个报警处理,或者说直接就跟相关的业务数据库关联,我们直接就把对应的那个订单状态把它更改掉啊,这些都是可以去做的,我们关键要去检测的是没有正常支付的超时了的那些订单。啊,所以接下来就涉及到了CP当中处理超时事件的这个过程。那当然了,现在又有一个前提,就是说,呃,我们要面对的是这样一个订单事件啊,我们可以定义一个订单事件里边,它可以有下订单的一个一个动作,也可以有支付一个订单的动作,我们可以把它合并成都叫做订单事件啊,但是现在呢,我们依然没有对应这样事件类型的定义,所以我们还是在外边先把当前的类先定义出来。
17:02
那当前这个我们就叫做order。订单相关的事件。在当前的订单事件里边会有哪些字段呢?呃,简单来看的话,当然就应该是首先应该有一个用户啊,哪个用户对这个订单做了一个操作,我们可以定义一个public。Street user ID。哎,那接下来呢,还应该有一个订单的ID,哎,其实我们会发现啊,对于订单的操作而言,用户ID反而可能不是那么的重要,因为我们当前并不是针对用户去进行各种各样的行为检测,而是要判断当前某一个订单的状态,因为同一个用户他有可能同时下很多订单啊,我们要考察的是同一个订单。创建了之后到底有没有被支付啊,那所以这里边我们关键还应该有一个订单的ID。Order ID。
18:01
啊,那对应的我们还应该有一个当前用户行为的类型。Public,我们可以定义一个当前事件的类型。最后还应该有一个长整形的时间戳。常规的一个定义,有了基本字段定义之后,哎,那另外还有一些我们需要把当前空餐的。构造方法声明出来啊,那另外。带参数的构造方法我们也都定义出来。还有to string方法。我们也完整的列在这里,方便我们做打印和测试输出。
我来说两句