00:00
接下来里边的处理逻辑呢,其实也非常的简单,那就是判断当前的事件如果到达的话,那就判断另外一条流理的事件到达了没有,那我们现在其实就是要去判断一下。目前的状态,呃,既然我来了第一条流里的数据,那么就要判断第二条流里的数据到底来过没有呢?那么对应的状态是保存在了third party event里,所以我要获取一下third party event里到底有没有值,哎,那所以它的这个值怎么拿呢?掉一个点。Value方法。判断它是否为空。如果不为空的话。说明我们现在就已经对账成功了啊,对应的这个数据都已经来了,所以这里就可以直接out.collect输出一条信息,我们说。某一个订单啊,那就Dollar value点下划线一,这是订单编号对照。
01:00
成功。那如果要是当前third party event本身真的就是空呢?A else,如果说它为空的话,那就说明另外一条流里对应的事件还没有到达,那没有到达的话,我们直接报警吗?不是,现在是要等待好。所以接下来如果。另一条流中的事件没有到达。九。注册定时器开始等待,哎,所以现在我们要做的事情是保存状态注册定时器,那定时器的话调用的是timer service下的register,现在要注册的是事件时间的定时器,Even time timer里边对应的时间戳,那就应该是基于数据里边的时间戳,然后再加上五秒钟,哎,那所以我们这里就是value点下划线三去加上。
02:04
5000,哎,这就是五秒钟之后的一个定时器,那另外需要注意的是还得去保存当前的状态啊,就是接下来我们是要进行等待,那如果另外一条流里的数据已经来了,来到了之后怎么样才能判断我们两条流里的数据都已经到了,对账成功了呢?哎,那显然是得我们这条流里的对应的那个状态里边要有值才可以,哎,所以当前我就应该是保存。当前事件到。对应的状态中。所以这里面我们需要APP event去做一个set操作,那set的话需要这样的方法是update。Update里边传入的就是一个三元组,那这个三元组当然就是value,这就是我们处理的一个基本逻辑啊,那当然了,如果说我们考虑的更多一点,考虑到系统性能的一些优化,因为我们想到你这里边定义了状态,如果说你只定义状态不清理的话,那之后我们的状态就会越来越多,最后会耗尽系统资源,所以这里边可以做一个操作,就是假如说我们对账成功之后。
03:14
那接下来其实就所有的状态都没有用了,我们已经已经找到对应的数据了,那显然我们接下来不会对他再去做判断了,那所以对账成功之后,应该做一个操作,就是清空状态。清空状态也非常的简单,哎,我们现在就直接third party调一个点clear方法就可以了,所有的状态都可以调这个点clear方法。那APP event要不要清空呢?啊,其实我们看这个逻辑就知道啊,如果走了if这个分支的话,那APP event里边就没值,那也就不用清空了,哎,所以这就是我们完整的处理逻辑了,那这一部分。当然就可以完整的复制到process element2里面来,因为他们是完全平等的,两条流的处理逻辑完全对称,只不过就是把这里的search party改成APP event,哎,那然后接下来我们这个清空也是清空APP event。
04:12
然后接下来我们这里保存事件的时候呢,保存的就应该是third party event啊,然后进行一个update操作。而上面我们注册定时器的时候呢,提取时间戳用到的也是四元组里边的最后一个元素,所以是下划线四,这就是Co process function里边具体的实现啊,那把这一部分逻辑都实现完成之后。最后我们如果想要看到结果的话,因为已经是一个string类型了嘛,那就直接print打印,最后不要忘记execute,执行起来就可以了。这就是我们对于每个事件到来之后进行的处理流程,诶,但是我们整个这个Co process function这样就处理完了吗?没有,因为我们只处理了对账成功输出信息这样一个场景。那如果说。
05:05
当前还没有对账成功的话,我们只注册了定时器开始等待,并没有说什么时候就报警,哎,我们没有输出对账不匹配的那些情景,那什么时候报警呢?那自然就是要等到定时器触发的时候就要进行报警了,所以这个时候呢。我们还应该去实现另外一个方法,那就是on timer on timer一旦被触发,那很明显就说明我们当前有某一个事件没有到来,导致我们没有对账成功,哎,那所以这个时候我们就应该输出一条报警信息了,但是这个报警信息我们判断它到底应该是哪个数据没来呢?判断的方法当然就是看我们之前状态里边保存的数据,哎,那如果说APP event里边有值,Third party event里边没值的话,它是空的话,那就说明他没来嘛,哎,谁是空就说明谁没来。
06:02
所以接下来我们的处理策略也非常的简单。就是。判断状态是否为空,如果为空,说明对应的事件。没来。但是仔细一想的话,我们这里的逻辑其实还是有一点漏洞的,呃,因为我们之前啊,如果说某一个流里的事件来了之后,我们注册了定时器,开始等待另外一条流里的数据到来,那另外一条流里的数据到来之后呢,我们就会输出对账成功的信息,就会清空状态,按照这个状态不就变成空的了吗?而且我们在这里呢,又不会删除定时器。那不删除定时器的话,到定时器触发的时候,我们一判断状态为空了,那就又会输出报警信息,事实上诶当前是已经对账成功清空状态的结果。所以我们这里边还得去判断一下,到底是哪种情况触发了当前定时器,当然为了解决这个问题的话,一种方式啊,最为简单的方式其实是在之前清空状态的同时,后边我们去给一个删除定时器的操作啊,我们这里就ctx timer service去。
07:19
Delete time timer,但这里面又有一个问题,就是说要去删除一个定时器的时候呢,必须传入对应定时器的时间戳,但是我们现在怎么知道之前定义的那个定时器它的时间是什么时候呢?因为我们现在只能知道后到的这个事件对应的时间戳,先到的那个事件的时间戳我们是不知道的,而定时器是根据先到的事件的时间戳加了五秒钟去定义的,哎,所以这个就比较麻烦啊,那干脆我们在这里呢就不去删除定时器了,而是在这里做另外一个反向的判断就是。如果某个状态不为空的话,那说明。
08:03
另一条流中。对应的事件。未来。啊,那这个逻辑可能要稍微的绕一下啊,就是我们当前其实有两个状态,主要就是有四种对应的匹配的结果吧,就是一种是两者都为空,那两者都为空是什么情况呢?其实这个很简单,就是两者都为空,那肯定就是被我们清空了,所以这种情况下干脆我们什么都不要做,那还有一种情况就是两者都不为空,其实两者都不为空这种情况不会出现,因为假如两者都不为空的话,那他总是有个先来后到啊,后到的那个事件,他到来的时候就会检测到匹配成功,就会输出对账成功,然后清空状态。啊,那所以我们接下来要考虑的就只有某一个为空,某一个不为空,那这种情况,那就是不为空的那个状态,它对应的流里边的事件是已经到来了,而另外一条流里边的数据没有到,所以我们没有对照成功,数据出现了缺失啊,哎,那所以按照这个逻辑的话,我们就判断。
09:10
到底哪个状态不为空,然后输出对应的报警信息就可以了,这个逻辑稍微的有一点绕,但是避免了一些复杂的情况,所以这里边可以做一个预判断,如果APP event。它里边。不为空的话。它不为空,那自然就说明对应的third party event为空,这个事件没有来,所以这里边我们可以直接输出一个报警信息了,out.collect我们直接输出,哎,那订单ID的话,我们可以直接在APP event里边,Value里边,把对应的下划线一拿出来,这就是订单ID啊,那么它对照。失败。第三方。平台支付。
10:00
事件。味道,诶,这是我们对应的报警信息输出的一个检测结果啊,那同样的道理,如果third party event里边有值不为空的话。那就说明APP event里边一定没有值,对应的事件没有到来,出现了异常情况,哎,那所以这里边我们输出的信息就应该是APP支付事件味道,哎,这就是我们整个的这个处理的过程啊,当然了,最后为了释放资源的话,我们也可以做一个状态的清空。APP event clear third party event clear啊,那如果本身这个状态里面没有数据的话,Clear的话,那就相当于什么都不做了啊,得到的最后的结果还是清空的一个状态。这就是我们完整的处理流程,所以接下来我们可以运行一下啊,这里如果要想看到最终的结果的话,当然这还不够啊,我们只是定义了两条流连接操作之后啊处理的流程,然后呢,得到的这个STEM类型的数据流,还应该做一个控制台的打印输出,那要加一个。
11:09
Print这样一个think,任最后env execute执行起来。完整的流程都已经实现了,接下来我们可以运行一下。看一看效果是不是符合我们的预期。我们可以看到,果然检测到了ORDER1是对账成功的,因为我们在两条流里边都出现了ORDER1的支付成功事件,而且他们的时间戳相差不到五秒钟。另外呢,ORDER2那对账失败,第三方支付平台事件未到,而另外ORDER3呢,也是对账失败,APP支付事件未到,哎,这就是我们想要的结果,这就是实时对照这个需求的具体实现。
我来说两句