00:00
给大家简单的介绍了一下在data STEM API里边的draw API draw相关的操作,大家会发现它应用的场景其实还是比较局限的啊,调用的模式也比较固定,呃,那对于我们当前的这个需求,两条流合并,能不能用join API直接做一个实现呢?啊,其实也是可以的,那大家想一下,我们到底是应该用window drawing还是用inter draw呢?诶,通过当前之前的学习,大家也发现了,肯定是internal,对吧?啊,因为大家想我们这里边主要这个时间判断是什么呢?就是两条流里边一条等,另外一个一条流嘛,啊对吧,那那这相当于是什么,我们是不是就设置一个上线下线两条流互相等就完事了,对吧?我觉得一条流撞进另外一条流,可能这条另外一条流比它这个数据早,也有可能比它迟,这不就是我们说的那个上下限,上下界的那个问题吗?那那前面我们那个无撞移的话,那得开窗这个场景其实根本用不到,所以接下来给大家用这个interval draw语啊,再做一个简单的实现,定义一个这个新的object transaction match,我们来写一个withd Joy。
01:12
那前面的这个代码这个就不用说了,直接抄就好了啊,跟前面完全一样,直接把它copy过来。我们直接copy到这个前面,处理到这里为止对吧?呃,上面还是对于那个get class,这个就不需要了,另外我们把这个影视转换引入接下来的这两条流,哎,大家看现在都已经做过KBY了啊,那就非常简单,是不是可以直接inter join了对吧?所以这里边我们定义一个result stream,然后前边的order event stream,直接去有一个方法interval draw对吧?然后把这个receipt event stream再做一个连接,哎,那另外接下来做什么操作呢?没什么好说的,Between对吧?只有这样一个方法定义上下界,那这个上下界这里边的这个time还是那个time,就是还是那个window time.time对吧,跟之前我们定义的那个time是一样的啊,然后这里边关键是到底给多少呢?诶,前面我们说这个一个三秒一个五秒嘛,那关键这里边就是你到底是负三正五还是负五正三呢。
02:19
如果要跟之前定义的一样的话。那我们就得看之前到底是谁等谁了,对吧?哎,大家看这里边我们的这个定义,呃,五秒钟延迟的时候啊,在这个代码里边,这是来了一个就是已经。呃,在在这儿啊,已经来了一个pay,对吧,来了一个订单支付事件,然后呢,没有来receipt,我等他的话等五秒钟,那所以大家说现在我在做这个interval join的时候,我是哪条流,Join用哪条流,是order event order pay去join用那个到账流对吧?哎,我们说是他要要等他的话要等五秒钟,那相当于应该是个什么,是不是上界等正五秒钟啊,对吧?因为这里边是不是它的这个时间可以比它大五秒钟嘛,诶所以这里边是给一个正五,那同样下届是不是负三啊对吧,就是这条流里边的数据是不是应该可以比它早三秒钟到啊,那相当于是他等他三秒钟对吧?哎,所以这里边我们就是给一个time seconds,给一个负三,然后再来一个time second,给一个正五,这就是我们所定义的这个上下界啊,当然如果说你你还想要定义说,就是说这个上下界如果不包含的话。
03:37
啊,你可以调这两个方法对吧?Exclusive这两个方法我们这里边就不需要了,接下来就只有一个方法能调,里边必须要去实现一个process drawing function啊,那比方说这里边我们列一个transaction match with drawing result。然后最后我们可以把这个result stream做一个打印输出,Print输出,最后不要忘记还是en为execute,执行起来当前是transaction match with draw,呃,Job对吧?接下来我们看一下怎么样去实现这些东西啊,Class transaction match with drawing result,它必须要实现的是一个process drawing function,对吧?然后里边的类型大家也看到了,1INPUT,二,还有一个output,类似于一个复杂的map转换啊,所以这里边我们可以INPUT1,那不就是order event嘛,第一条流嘛,然后第二条流receipt event啊,然后最后output output,大家知道这里边我们是不是匹配起来的话,那就直接输出就完事了,对吧?呃,包成幼儿元组啊,直接输出就完事了,所以还是order event。
04:57
和receipt event的一个组合,哎,这样一写,上面不再报错,里边必须要去实现的一个方法就是process element,然后大家会想到,我们要的不就是把它匹配起来吗?这里面不就是一个left一个right匹配就完事了吗?啊,那你out.collect left right,这不就完事了吗?就这么简单。但是大家会发现,之前我们还关键是想要去找到那个不匹配的那些数据,对吧?你现在能检测到不匹配的数据吗?
05:30
检测不到对吧?哎,大家想这里边的这个连接是不是相当于只有我们就是所谓的这个内内内连接的这种状态对吧?哎,只有就是直接把这个所有的数据直接去连接在一起,合并在一起啊,假如说在当前我们的这个范围内啊,你有一个数据,然后它没有对应的匹配的话,它是没有输出的,这里面,哎,所以对于这种场景而言,大家会发现你运行一下得到的结果就只有就是成功的那些事件匹配成功的事件能够输出,那如果不成功的话,我们其实更关心的是不成功的事件,对吧,那个其实是输出不了的,所以这里边的这个场景更好的应用还是之前我们直接用这个connect,然后去做这个两个处的这种方式会更加的,呃,符合我们这个应用场景一点好,现在这个代码已经运行起来,哎,大家看到这里边输出呢,就全是成功匹配的对吧?啊啊,然后这就是得到了我们运行的这个结果了啊。
06:30
当然这里边大家可能会发现一点,就是这里好像成功匹配的这个数据比之前好像少了一点,对吧?哎,为什么会少这些呢?其实大家如果看的话,现在我们这个时间范围定义的是非常非常局限的,对吧?这里面只有这个,你看4447,诶这个确实是在负三正五之间,4548这个没问题,9592这个也是没问题,负三对吧?啊9393,这这肯定没问题,这几个真是啊,没没问题,能够正常输出的,那前面我们为什么会输出那么多呢?
07:04
我们可以运行一下,再再看一下这个结果啊。啊,大家看到之前的这个输出结果,好像这个匹配的好多呢,对吧,如果我们仔细看的话,诶这个你看这个4447,这没毛病啊,4548,这也没毛没毛病,这6152,这怎么能够直接输出呢?诶这里要给大家解释一下,主要的问题在于哪里呢?其实也不是我们的这个代码有bug,主要的问题在于什么?这就是我们现在还是以这个文件作为数据源直接输入的,那大家想想文件读取数据的时候,它是不是本身就处理的特别快啊,它会导致一个什么问题呢?大家回忆一下那个water mark,我们现在是事件时间,对吧,你要触发当前我们那个定义的定时器,五秒之后,三秒之后,那是不是必须要water找到那个位置啊,但是water是多长时间产生一次呢?Water是每来一条数据就会就会更新一次吗?不是的,Water mark是,诶我们现在的周期性生成200毫秒更新一次,对吧,200毫秒生成一次。
08:08
这个,所以现在会导致一个问题,就是什么在200毫秒更新的周期范围内,是不是文件里面读取数据已经全读进来,全处理完了呀,就会导致什么呢?导致诶我认为处理的过程当中,认为他他们都是正常匹配的,已经匹配上了,对吧?但事实上呢,当前的这个water mark还没有变,对吧?啊就是它它正常匹配上是因为我water mark没有变,所以那个定时器也没触发,但事实上这个当前的这个时间戳已经超过了,对吧,它是要等到200毫秒之后,沃马又一次触发的时候发现,哦,我一下子原来这个时间已经跳变这么多了啊,它会来一个沃mark跳变,但是是,但是前面我们这个已经匹配检测输出的这个就没办法改变了,所以是主要是这样一个原因导致的,大家如果能一条一条数据做这个流市数据输入的话,你就会发现最后的结果还是正确的,对吧?啊,就还是只要不出现这个沃mark跳变的场景啊。
09:08
就还是按照我们那个三秒五秒的这个等待时间会输出正正常的匹配,其他就都是一个,就是超时之后就相当于匹配不上了,好关于这一部分我们就介绍到这里。
我来说两句