00:00
来呢,我们还是在代码里边,当时我们已经引入了事件时间语义,也给大家设置了这个water mark啊,当时我们设置提取这个时间戳,设置了一个呃,30毫秒的延迟,对吧?啊,那这里面30毫秒的话,大家可能看的不是特别的清楚,那为了表示这个给大家做测试看的清楚啊,我们直接设置一个三秒钟的延迟,因为本身我们这里边的数据,大家看到这个都是秒嘛,对吧,我们都是按秒来算的,你如果要是按毫秒算当然也行,但是呢,呃,这个可能就是说整体你使用的过程当中,就跟我们实际不太一样了,因为大家知道啊,我们在这里边,呃,这个比方说你在Linux系统里边直接看一下当前当前的这个date的话,比方说我们这个加百分之S这里直接就能打印出现在的这个时间戳,对吧,你看现在的时间戳就按我这个打印频率,你看它变化情况是不是这个十位,这应该表示的是一个秒啊,对吧,就是从1970。
01:00
今年1月1号零点哈,那个伦敦时间,到现在为止经历过的这个秒数是这么多,对吧?如果你要是用那个毫秒数的话,那显然不,不仅仅这么些,后面还得还得有三位啊,所以你如果认为这个是一个真实的时间,就是比方说是最近,比方说几个月,对吧,几年之内的一个数据的话啊,那你当然是应该把它当成秒来做计算,后面乘1000,那你如果要是说有的同学说我这里边就直接不乘1000,那会发生什么事儿呢?也不会发生什么事儿,弗link照样做计算,只不过他就把这个数当成了一个毫秒,哎,那大家想你这个数当成毫秒的话,那就相当于是什么呀?他认为这个时间,这个数据产产生的时间应该就是1970年那会儿离得很近的一个时间了,对吧?啊,这就变成这样的一个毫秒数了嘛,所以这可能跟我们具体的实际的含义表达不一样,但flink其实可以正常去做,这里只是跟大家稍微的强调一下,就是说我们真实。
02:00
的这种场景下,一般情况啊,按照我们现在时间幺五几打头的话,十位是一个秒数,13位才是一个好秒数啊啊,那那这里边我们来给大家做一个测试,要做测试的话,你就不要用这个,前面用这个呃,文件读取对吧,我们还是起一个这个NC,用这个流式的数据一条一条输入,这样的话效果会更加好一点,所以接下来我们在这里边还是起一个NC-LK7777,接下来我们运行一下这个代码,给大家测试一下这个效果啊。好,我们把这一部分来运行一下。哦,这里边,呃,我们这个代码可能还稍微有一点问题,就是我们允许处理的迟到数据,后边呢,还把这个数放到,就是最后的漏网之鱼放到特殊出流,但是我们的特殊出流没有去没有去把它拿出来,对吧?所以我们这个特殊出流你你没做任何操作啊,哎,我这里边也不要去做操作了,因为这个做操作我还得去再去找之前的那个数,对吧?这个稍是相对有点麻烦,我只要拿到它不就完事了吗?那怎么样能拿到呢?Get set out theut,然后同样这里边要传一个output tag,就是我们当时定义的同样类型和同样标签的那个output tag,这个就指代了我们特定的一个测殊枢流啊,所以如果要是这样的话,我可以直接在上面把这个就定义出来,对吧,呃,Later tag直接把它定义出来。
03:32
我们这里面就直接传一个tag不就完了吗?Tag对吧?然后下边把这个获取到,然后接下来我把它打印输出对吧?注意打印输出的时候不同的流,我把它做一个标记对吧?这里边是late,这里边是result,然后接下来我们把这个代码重新运行一下,看看这个效果怎么样。接下来我们来给大家一条一条做测试啊,现在我们就是真正来看他行为的过程了,好341,我们先来一条数据。
04:05
这里边没有没有复制成功了啊,我重新copy过来,嗯,还是没有复制上,诶这里边这个好,这里边我们输了一条数据,这个是当前是三一对吧,这是我们,呃,就是只输入了第一条数据,后边呢,我可以输入不同的这个SEN4类,呃34ID的这个数据,但是大家知道这个其实对于我们当前341没有影响,对吧,因为已经分组了嘛,窗口他们都是各自统统统计各自的,这个没什么区别啊,然后我现在要给大家提一个问题,就是说第一个窗口应该什么时候关闭呢?大家觉得应该什么时候关闭第一个窗口好,这里面大家如果不知道它到底什么时候去触发第一次的这个关窗操作的话,那我们就一条一条试呗,对吧,不停的把这个时间戳往后推移就完了,诶这里边比方说我给一个呃,这个哦,这里边又给了重复的一条,这个347是。
05:05
是吧,这个不对啊,没有把这个复制上,我把这个三四十复制过来,然后现在大家注意啊,现在我最大的时间戳是205,那现在我的时间是多少呢。因为代码里边我们给了water mark,大家还记得这个water给了一个,当前给了一个,呃,延迟时间是三秒,所以当我输入一个205数据的时候,现在我的真正的时间进行到的时间其实只是202对吧,应该要13秒,那所以大家看就是202之前如果要是有有窗口的话,该关就关了,那现在既然没有输出,那说明并没有202之前的窗口要关,对吧?那我们就想15秒一个窗口,到底这个第一个窗口是哪个点呢?哎,那我们就一个个往后往后追加数据吧,对吧,然后来一个,诶,这里边又复制错了啊。
06:04
然后206,大家看现在我的是不是涨到了,涨到了这个205啊,对吧,然后来一个208,大家注意,208来的时候,我当前的water mark是不是涨到了205,诶,那大家想205看着像是一个像是一个整五的这个数啊,对吧,15秒一个窗口,它居然没关吗?哎,它真的没关啊,真的没关,这没什么好说的,那我们直接就再来一个210。那还是没我们一个二。诶,大家看到213进来的时候终于关窗了,所以我们知道现在关的其实是哪个窗口啊,213进来的时候,当前的water mark涨到了210对吧,而且大家看我们这里面统计这个三四十一最小值的时候,你看他统计210的数据了吗?
07:01
没有统计210的数据,对不对,29.7明明是最小,它没有统计二这个,呃,这个29.7210的数据,你看这这就是我们当前的这个最最大的那个时间戳嘛,它只统计到了208的数据,所以说这里面是什么呢?当前统计的这个窗口就是15秒一个窗口,那是不是应该就是往前推15秒就应该是195。到210啊,就是这个窗口对不对,包括195,不包括210,好,那大家继续往后想,下一个窗口什么时候关闭呢?哎,那15秒嘛,下一个就应该是210~225对吧,就应该是这个样子啊,所以接下来我们还有还有什么呢?还有一分钟的这个呃,延迟时间啊,我们先测这个窗口的这个输出的结果啊,先看它的这个15秒一个窗口这个结果,然后我们再测这个乱序数据啊迟道数据,好,那接下来比方说啊,我这里边假如说有有这个乱序,那大家会想到我可以给一个这不是213吗?我可以给一个212啊,那那大家知道我给一个比较小一点的数啊,28.1这个是最小了,对吧?哎,那如果来一个这个数,他是不是能收到我们后边210~225后边这个窗口里边来呢?哎,当然是可以的,对吧,你这个213其实它在。
08:27
后面是完全没有问题啊,那后面那个225那个窗口到底什么时候关闭呢?那我们知道这就必须得等到二二啊,有同学说那你等等到225嘛,诶你是可以等到这个225,但是225的时候没有窗口输出,因为现在的water water mark只涨到减三,只涨到222,那你要是想要输出225的窗口,让225的窗口有一个结果计算输出,那是不是必须要228的数据来了之后才会触发它的关闭,呃,这个就是窗口的这个结束操作对吧?计算操作啊,诶我们这里面来一个二二,你看输出了一个341,又来一条信息,大家看这里边最大的时间戳是212对吧?212,那因为225这个我们说前壁后开嘛,210~225的窗口是不是就包含210,不能包含225啊,对吧,所以大家看到。
09:27
好,这里边我们最大的时间戳是212,然后呢,诶28.1包含进来了,对吧?呃,就这里边前面我们这个乱序并没有受到影响,这是我们这个watermark本身你做了这个有三秒的延迟嘛,这个肯定是可以把它处理能搞定的啊,那接下来我们要看的是,你看现在我们以为对于这个三一啊,这些数据,你看210的这个窗口啊,我现在说的210的窗口就是以210作为结束时间的窗口,还有这个225的窗口,看起来都做了一次结果输出了,好像都做了一次计算了,对吧?那是不是这个窗口就关了呢?
10:05
诶,接下来我们继续现在这个窗口,不是已经有这个结果输出了吗?我再来一条之前,哎,比方说213的数据,对吧,我来一条数据,呃,比方说这个。呃,给一个这个数据小一点对吧,我给一个19:19点五,大家看他实时的就在我们之前统计的结果基础上,直接来了一条数据更新,这条数据是什么呢?就是刚才我们的这个,因为最小值也更新了嘛,19.5对吧,然后这里边给了一个213,诶为什么这里给的是213不是我们最大的那个时间戳呢?这跟我们的逻辑有关嘛,你当时这里边处理的时候,我们就是用当前最新的这个的时间戳直接放在这儿了,对吧?因为我们没考虑乱续,我我直接就把这个最的它的时间戳直接放在这了,没有取最大值,那你如果要是总要取那个当前最新的,是不是我们这里面还应该调整一下逻辑,用那个最大值呀,对吧?这样的话,这里面就应该,呃当前这个啊,呃,但当然这里边你如果要取一最大值,大家看之前本来是212是吧,那你现在是不是还应该是213啊,对吧,这个好像看不出什么区别啊,我这里。
11:19
咱可以给一个比方说我再给一个215,那大家想我给一个27.6对吧,这个大一点,那大家看更新之后是什么呢?最小值还是19.5对吧?但是现在最大的时间戳变成了215,这个不是最大啊,我们的逻辑是最新的时间戳变成了215,对吧?所以大家看他的行为是在这等待的一分钟时间内,是在之前统计结果的基础上,来一条就输出一一次,来一条就输出一次,对吧,直接就把它这个在之前的结果上更新了,不要等到这个一分钟结束的时候再去更新啊,所以这就相当于是什么,就是我们说的那个拉姆达架构啊,就是来了一个我们这里边啊有一个这个任务就是数据先来,来的时候呢,我们先定义窗口,这个窗口是比方说九点钟到点准时输出一次,然后这一次输出了之后之前来。
12:19
的数据是一直攒着的对吧,一直我们说增量聚合它是不输出的,然后九点准时输出一次,这说所谓的准点是说的这个按water mark对吧,考虑了watermark延迟之后的准点,然后呢,哎是那就是延迟这个迟到的数据,那就是每来一条就输出一次,每来一条就输出一次,就变成这样的一个实时更新的这样一个状态了啊,所以大家要把这个行为要考虑清楚啊啊那最后我们还有一个,那假如说等到一分钟之后漏网之鱼怎么办呢?啊,一分钟之后大家想我们当前这个225这个窗口,225要关闭的这个窗口,对吧,一分钟之后,那是不是应该要加60秒啊,225加60秒,那就是285对吧?哎,那有同学可能想到了,我这里边给一个285不就完了吗。
13:08
给一个285啊,然后20随便给一个对吧,28,那大家看这里边为什么又有一个输出呢?这个输出是啥呀。这这个输出并不是我们之前做了一个叠加,对吧,这并不是要叠加,而是因为285这个数进来之后,前面我们又有窗口关闭了,对不对,对吧?因为你后面还有225228这两个数据,它是不是应该属于225,再过15秒到240那个窗口啊,所以这个窗口里边又统计了一个最小值29,然后最新的watermark是228,它也关了,呃,它不是关了,它输出了一次超过240了嘛,对吧,你现在都已经二八几了嘛,啊,那超过240了,当然它是要输出一次计算结果啊,那这里面大家会注意我当前的之前的那个225那个窗口关了吗?
14:01
注意我再输入一个之前类似于我们这个二幺几的数据,对吧?二幺几的数据它就包括在当前的这个210~225的这个数据范围内,我给一个24.4,诶大家看它还能继续输出,对吧?之前我的最小值还是19.5,然后现在最新的autom变成了2218了,还能更新之前的值,说明这个窗口还没关呢,为什么还没关啊,为什么还能更新呢?因为我们现在注意延迟一分钟也是以water mark为准的,你现在来了一个285,哎,那现在water是多少呢?是282对吧?282跟我们那个225的那个窗口比起来啊,要那个结束时间225的窗口比起来,相当于只过了57秒,还等着呢,对吧?那所以我们就知道了,什么时候真的就关了呢?当然就是再把那个water mark的延迟。
15:02
三秒延迟时间加进来对吧,我再来一个来一个288,这个时候大家看这里边并没有任何的输出啊,因为后边你该关的窗口里面都没数嘛,对吧,我们后面的那些窗口都没数,所以说没有任何的输出,但是退移了到285了,到285的话,那是不是等待60秒的这个时间,诶,我们相当于前面225的那个窗口就彻底关闭了呀,这个时候如果我再来一条数据,比方说我这个218对吧?呃,我来一个219的数据啊,这个无所谓啊,你随便来什么都行,大家看它是不是就变成late了,这个时候真的就把它扔到测输出流,然后把它拿出来了,你这个时候要判断的话,是不是就得判断我当前这条数据,诶,到底属于哪个窗口,对吧,判断一下时间戳属于哪个窗口,然后哦一看219,它属于这个215到二,呃呃,210~225,对吧?啊,所以我再把它叠加到之前215~225的这个这个数据。
16:02
跟这个数据去做对比,做一个叠加啊,就是做这个操作就完了,这就是我们完整的一个测试和处理流程。
我来说两句