00:00
我们现在已经了解了电STEMAPI里面的照应怎么操作,那回过头来我们看一下当前这个项目需求啊。两条流要做实时对照,我们可以用这个撞影去做一个操作吗?哎,其实也是可以的,因为既然是两条流要连接嘛,对吧?哎,那连接的这种方式用一个状语是可以的,那如果我们想用状语的话,两种方式用window状语还是interval状语呢?那大家就要想了,我当前这个实时对照这个有窗口吗?好像没有窗口是吧,里边之前我们涉及到的时间点是是有一个什么呢?涉及到的这个时间是不是有一个两边互相去等的这个时间啊,对吧,就是两条流,哎,我等你几秒钟,哎,你的数据如果到了的话,等我几秒钟,那在想这个时间用什么来比较表示比较合适,这用窗口比较合适吗?这其实不是针对,比方说滚动或者滑动,滑动窗口放在那儿,然后里边的数据整直接去做连接就完事了,这其实是跟跟数据来的,对不对?当前数据来了之后,以它为标准,我比方说等五秒钟的话,那是不是就相当于另外一条的数据,那个时间说可以比它大五秒钟啊,是不是就是这样的一个定义,哎,那所以大家看它是不是用区间连接inter状会更合理啊。
01:21
哎,所以接下来我们就用区间连接join做一个实现。这里边我们就是还还是在下边啊,你有一个class,当前是transaction pay match,呃,我这个给大家叫做by drawing,对吧,用join做一个实现。首先前面这个主方法里边,大家想到应该是差不多的对吧,两条流我该有的那个数据肯定都得读进来嘛,所以呃,这里面我先把它这个主方法啊,直接都copy下来,首先是order event,还有这个receipt event直接到这儿。全copy下来。
02:00
呃,当然这里面我们的这个去反射这个类型的时候,也是直接用自己当前的这个类就可以了,对吧。不要用别的transaction pay match by join好,先把这两个得到,那最后当然还应该有一个env execute执行起来里边是,呃,这个transaction pay match by。呃,Job对吧,好,那么在接下来的这一步之前,我们是直接connect连接两条流,那现在我们是不是要做一个。要做一个inter draw啊,对吧?啊,我们区间连接两条流得到匹配的数据啊,所以这里面我们做的操作前面的那条流all the even stream,它是不是直接应该要先做一个KBY啊对吧?不K是没办法去做这个区间连接的啊,啊所以这里边我们先KBY。
03:03
用当前order event,我们要的是那个transaction ID对吧,用这个transaction ID,然后接下来直接大家看inter调这个方法。里边我们join的是另外一条流receipt even stream,大家看直接传这个它直接报错了,因为对必须是不是要key by之后的t stream才可以啊啊,所以这里边我要做一个KBY里边选取的当然还是get transaction ID对吧,还是以这个trans在值ID作为一个连接条件,那接下来带一点就能看到是不是只能去between啊,那这里面between给一个上下界给多少呢?哎,这里大家就想到,如果我要跟之前的这个时间一样的话,之前我们是配饰件要等对应的这边那个支付到账时间的话,是等五秒钟,然后支付到账时间等这个配事件呢,是等三秒钟,那你说现在我的这个drawing,这是order pay的那个事件,再连接另外的一个到账事件,对吧?那所以是不是以这个配事件为准啊,他开一个时间范围,那开的这个时间范围应该是开多少呢?
04:15
五三吗?诶,大家想五三的话,那就相当于是它加五到加三之间,这不对了,对吧,是不是下届一定要比上届小啊啊诶大家想是有有同学说是负五三,大家想如果是负五三的话,意思就是是不是比它。早减五,那是不是早五秒钟的那个到账时间和比他迟三秒钟的到账时间这个范围内的都可以连接起来,对吧?这个合理吗?我们想要定义的是这样的一个东西吗?其实应该是我要等它是等五秒钟,那是不是倒的时间应该比它要大五秒钟啊,对吧?如果到的时间要等他三秒钟的话,那是那是到的时间比他早三秒钟对不对?哎,所以接下来我们其实应该是什么呀?
05:04
是不是负三到五啊,哎,所以这里面大家注意一下这个上下界啊,按照我们的定义time.second现在是负三到time.second正五对吧,这样的一个范围就是负三到正五区间范围。然后接下来,呃,做了这个begin之后,其实下一步就没有别的选择了啊,要要如果你想把这个上下界去掉的话,你可以调一个这个方法对吧,这个可选也可以不调,那接下来是不是就只能去实现一个process join function了啊,就是这个对吧?所以这里边我们用一个transaction pay match,呃,我们叫detect,之前我们那个叫detect对吧,然后现在我们叫by join,把这个做一个自定义的实现啊,那最后这个得到的其实就是一个result stream对吧,其实大家想到了就是最后这个result stream啊,我们在之前是把当前的正常匹配的数据输出在了这个result stream里边,然后是不是在这里边定义那个测殊出流去做那个,呃,就是未匹配的,没有匹配的那个数据的报警啊,那大家想现在这个process function式里边状元方式里边是有上下文,然后是。
06:34
可以做测试物流对吧,但是我能检测到不匹配的事件吗?哎,这里面大家其实一看就看到了啊,这个实现自定义的process drawing function public,哎,这个啊,Public。No。Public static class,呃,当前我把这个实现出来,既然是process家族,大家知道肯定是extend对吧?抽象类嘛,所以这里边我们实现的是process drawing function里边三个泛型两条流的数据类型,印一印二还有输出对吧?那大家知道硬一的话当然是all event,硬二是不是receipt event最终的输出类型,哎,那大家可以借鉴之前我们这个输出是不是包装成一个二元组,直接匹配输出就完事了啊,所以直接把这个copy一下。
07:34
二元组。好,TEMP2引入,大家看上边这里边的话,这个得到的数据类型是不是也是应该是一个二元组类型啊,那其实大家发现了,这里边我们能够匹配的到的啊,所有的数据是不是都要调这个process element里面的方法呀,那只要调到这个方法里边来,是不是都应该有一个order event走有一个receipt event呀,有那种只来一个的情况吗?这里面没有对吧?
08:04
所以大家注意这里边ctx,我们是可以用ctx去做output做一个测试物流,但是这里边我们找不到那个匹未匹配的事件了,对吧?啊,所以大家注意用这个draw影啊,其实主要是处理正常匹配的事件的,而不是处理那个就是报警异常的对吧?啊,所以大家要注意一下这个过程,那当然这里边我们已经知道怎么回事的话,是不是直接alt.collect就可以主流输出这个正常匹配结果了,你有一个二元组对吧?把当前的left和right是不是拿出来就完事了,这不就是我们最后要的结果吗?好,这里可以运行一下,大家看一下就只能得到一个正常匹配的数据,而拿不到未匹配的那些all matched PA和matched receip receip了,这就是我们整个呃,Draw的一个连接的过程,好,我们看一看这结果啊,大家看这里边就得到的都是正常匹配的结果,对吧,这就是关于这个join引的用法大。
09:04
大家可以再总结一下啊,我们就用不同的方法实现了这个需求,当然对于这个需求而言,其实看起来还是用connect更合理一点,因为我们可以拿到,呃,就是测试主流里边这个未匹配的那些数据。
我来说两句