00:00
梳理清楚了思路啊,接下来我们其实这个思路呢,就是两条流里边的数据分别来,然后来到process element1里,我其实就应该要判断它对应的相同K相同一个单的另外一个那个到账信息来了没呢?这得保存一个状态了,对吧,因为我不可能是。同时又在处理process element1的时候,又在处理process element2,这是不可能的嘛,啊,所以我需要保存一个状态,那在前面我们就要定义一下喽。我们首先以状态。变量。用来保存已经到达的事件。之前我们定义的是list state啊,就是在这个top n的时候,我们是用了一个列表状态来保存,那现在其实不需要列表了,那你也要列表啊,不是两个事件吗?呃,其实不是哈,我们这两个事件类型还不一样呢,一个是三元组,一个是四元组啊,那所以这里面我们就单独定义两个状态,这两个状态呢。
01:05
都只是一个值而已啊,所以在flink里边,它是有一个专门的value state value state对应着之前我们那个list state啊,这里有一个value state。就表示一个值的状态,分读值的状态啊,那当然了,首先应该有一个单元组的值状态。当前这个我们就叫做这是APP这条流里的啊,我们叫做APP吧。然后同样的非常类似啊,我们再定义一个value state。里边应该是一个四元组。这是从另外一条流,第三方支付平台一条流里边拿出来的对应的这个数据啊,那保存出来的数据啊,那我们叫third。Event。Third party unit date,好,先把这个定义的都定义出来,那另外大家会想到之前我们不是说过吗?在这个top n里边要使用这个状态的时候,哎,你不能说直接拗一个什么什么状态就完了,它的这个获取是要从运行是上下文里边去获取的,诶那运行上下文呢?诶,那又涉及到一个就是你必须在当前任务已经启动,有了任务实力的时候才有运行是上下文,所以我们需要在open生命周期里面做这件事儿啊,那我直接把上次写的这个给大家copy过来啊。
02:34
Open生命周期放在,因为它是开始的时候,我们直接放在前面。这边我们就涉及到了一个。呃,APP state这边我们就应该直接就可以,然后这里边就是一个AU state script里边,这里的类型当然是三元组,后边这里我们就要改一下名字了啊这个叫。Event。然后后边的这个类型。
03:03
啊,这个是ta,然后里边的类型那就是啊,就是一个一个去写了啊,这个就很麻烦啊,三元组就是有这个问题,High,第一个是string,然后同样第二个也是string对吧,第三个是long,我直接copy一下。好,唐正行了,好放在这里啊,大家看到就是这个APP event啊,这样的一个定义啊,那与之类似的,我们还有另外一个。第三方支付平台的这个事件,同样get wrong time,然后get state,对吧?啊,那接下来我们new一个a state script,这是一个四元组里边这个叫做。Third。Party event好,然后后边再来一个哦,又是一个四元组,这个就麻烦了啊,我们直接copy一下上面的这个。
04:05
好,然后后边中间得加一个啊。Types。这样的话,四元组的类型就定义好了,好,那接下来这里来一个分号。是我们对于这个状态的使用,这麻烦的就是这个啊,就是状态我们定义就得定一半天啊,然后接下来的话,其实反而就简单一点了,我们对账的时候是怎么样的呢?那现在process element1如果要调用到的话,现在应该来的就是来的是APP event对吧,来的是APP流里边的事件,那我们是不是要看。是对应的定义。条流中事件是否。来过,哎,那咱自然就知道了,那我们之前的这个保存的是什么?不就是之前那个是否来过吗?啊,那这里边我们其实大家会发现,如果你只是要保存是否来过的话,一个布尔类型的值就可以了,我们现在是把它完整的那个事件保存下来,主要是考虑到你后面要匹配的话,输出的时候好输出一点,从状态里面可以把之前的那个事件全拿出来显示出来啊,这样方便一些。所以这里边我们就if来做一个判断吧,哎,那就是third party even state里面的值怎么获取呢?它是可以直接调一个value方法直接拿到的。
05:28
他拿到的就是一个呃,四元组了啊,那如果要是没来过的话,他就应该是空。如果要是来过的话就不为空了,那不为空的话,来过,那么我们当前就out.collect输出一个信息啊,那那这里边我们就直接来一个。那就是实时对账嘛,对吧,对照成功。我们接下来可以加上当前的两个数据啊,一个是当前的value。
06:00
然后另外。我们可以再空一格,加上the party isn't state at value啊,知道这个元组类型的话,你直接string对吧,直接把它输出就可以了,然后这里要注意,既然已经对账成功了。那这个状态就需要了,对吧?哎,所以这里边我们可以清空状态,这里边清空状态的做法其实就是直接调一个clear方法就可以把它搞定啊,那当然了,另外一个状态APP state大家会想到我,我就没给他写过东西,那正常来讲应该没有东西对吧?啊,那这样的话就什么都不要干啊。那如果要L呢,他要没来呢,没来,如果另外一个没来,是不是相当于我得等另外一个来啊,那等待的过程当中是不是应该先把状态更新啊,给更新状态,要注意这个更新的状态,呃,不是third party event的状态,而是目前APP event state。这里要更新它的更新呢,也很简单,来一个值放进去就完事,对吧?哎,把这个value放进去就完事,然后接下来还得记得。
07:09
定义注册,一个定时地开始等待另一条。流的事件,看这就是这个连接起来之后处理的一个过程,用状态来表示,另外一个流里面的是数据是否来过,如果要是已经来的话,诶,那就其实把它存起来了嘛,存起来两个就都拿到了,我们接下来就可以把它输出了啊,那那接下来就开始注册这个定时器,这大家都很熟悉,CTx.service timer当前的这一个,嗯。什么时间呢?哎,那代码,那当然要以当前数据的时间戳F2啊,比方说我们等五秒对吧。么加上5000。
08:00
五秒后的定时器。后的直器开始等待另一条流的事件,哎,这就是这样一个处理的过程。啊,那这个如果要是已经搞定之后就知道了。后面process element2是不是完全一样啊,啊,这完全匹配嘛,对吧,直接拿过来之后,只不过要改的是我现在找的就不是third party event了,我要看这个APP event到底有没有对不对,哎,如果有的话是那个先来了,那现在就匹配嘛,所以APP event state是否为空,那这里同样也是目前如果要是这样的话,我这个是APP event state。而且正常来讲,我应该他在,他是在前面的,对不对。是在前面,然后后边。哎。A。空格加上Y领,这是我们能够想到的一个处理的过程啊。那就是对账成功,然后清空状态,清的也是当前这个APP event state的状态。
09:00
好,然后接下来呢,更新状态,如果要是没来APP event state没来,那我是要更新当前第二条流对应的那个状态,对吧?而party state,然后把它更新出来,然后开始等待。就不是F2了,F3啊,哎,我也等待五秒钟啊,那或者比方说啊,我们这个正常情况下都是第一条流的数据先来,第二条流就一般情况正常应该是后面才来的话,那我们这个可以等少一点啊,甚至可以不等,就直接以这个F3作为当前的一个触发也是可以的,大家想这是什么意思呢?就是有可能我当前的这个数据是来了,但是wal mark不一定到了这个点啊,因为water mark有可能有延迟嘛,有乱续嘛啊,所以我就等一等这个乱序数据,就我默认之前的第一条流APP的event,它是应该在这个之前就已经来了,但是呢,它有可能乱续啊,啊乱续的话,那是水位线还没到,它是有可能这个等下才来,所以我就等到这个时间点就好了,它不会比我更慢啊,这个是完全可以的啊好,那这就是我们。
10:08
Process element的这个处理过程另外还有一个非常重要的方法就是on timer是如果出现。一直等,等到这个定时器都触发了,那就说明什么呢?说明定时器触发,呃,就这里边,因为我们没有删除过定时器,所以中间如果说哎,我们这个第一条流的这个数据先来对吧,来了之后注册一个定时器,那第二条流这个来了之后,即使匹配成功。哎,大家看,如果你这儿把那个定时器删了的话,那就那就没这个问题了啊,如果这儿我们不删,那是不是也会触发啊,哎,所以接下来我其实要判断一下,那判断什么呢?就是如果当前两个状态都是空的话。那其实就是已经对账成功了,已经被清空了,但是如果当前两个状态里边某个状态还在的话,那说明是不是另外一个没来啊,啊对吧,所以我们是判断动态。
11:06
如果某个状态互为。说明另一条留。中至建没来。那有没有可能两个状态都不为空呢?没这种可能,两个状态都不为空,那他总有个先后嘛,后面那个到的数据来的时候就应该已经匹配上了,对不对啊,就应该是输出成功,然后清空了啊,所以这个就不可能出现两个都走到这个else分支里边去,去更新状态的这种情况啊,那所以这是从逻辑里面能够分析判断出来的啊,所以接下来就是如果是appate,它的value如果不为空的话。不,他不为空啊,那接下来我们知道了,目前是APP event state到了,呃,APP event到了,第三方支付的信息没有到,所以我们可以直接输出一条out.collect这个信息啊。
12:08
当前,那就对账失败。那就是可以把当前的这个APP state,它的信息放在这儿。然后那么后边再加上一个。第三方第三方。支付零癌?近期。被盗。啊,这就是我们能够处理的一个一个状态,对吧?啊,那当然了,有if就有else了。呃,其实大家会想到这其实也不需要else了,对吧,如果它为空的话,那就那就不用管它了,关键就看另外一个喽。Third party state,如果他不为空的话,也是对账失败,不过呢,当前输出的是他的信息,然后后边我们提示的应该是。
13:09
应该是APP信息没有收到。这就是当前我们能够做的这个对应的这些事情啊,那最后的最后啊,那应该还有一个就是所有的数据都已经做完了之后,那我们应该把所有的状态清空啊,这个我们就不去单独判断到底是谁有谁没有了,全部清空。即使是空的,它也可以调这个不再再清一次嘛,所以直接清理状态就是完整的一个处理流程,少了一个花括号。换的话就可以了。接下来我们运行一下,看一看当前的这两条流啊,得到的信息是什么样的。运行一下。哦,大家可以看到。我们能看到啊,对账成功,ORDER1的话,大家看就是按照key分配的对不对,所以它可以对账成功啊,这个第一秒的时候有APP的信息,第三秒的时候有这个third party的信息,然后呢,呃,这个ORDER3也是啊,3.5秒的时候有他的信息,四秒的时候有这个第三方支付平台的信息,而ORDER2啊,我们看到它只有这个APP上的信息,第三方支付平台信息未到,所以就对账失败。
14:23
这个就很明显啊,就是你如果出现这种情况,我们这儿只输出了一个文字对账失败,那大家会想到实际如果项目当中做这个风控管理的话。我们这里就可以比方说啊,直接改数据库里边某一个关键的字段啊,把那个字段一改,你一看到那个变化了,就可以发出某种某种提示啊,我们在应用上就可以显示出来,或者说啊,你就直接可以调用某些那个短信平台的接口,出现这种对账失败的时候,给某个负责人就发一条短信啊,提示一下现在有问题了,这样都是可以做到的啊,这就是项目当中做风控的一些具体的手段。
我来说两句