00:00
我们现在已经知道window API到底该怎么使用了,那接下来我们还是在代码里边用一个具体的事例来测试一下,看看它的行为到底是什么样的啊,那这个例子我们这个也比较简单啊,就是要统计啊,就是我们开每每15分钟,呃,15秒吧,呃,15分钟的话等下测这个我们等太久了是吧,呃,每15秒统计一次。呃,窗口内所有温度的最小值啊,当然是按照这个各个传感器了,对吧,窗口内呃,各传感器温度的最小值啊,所以这就相当于什么呢?这个需求就相当于我们要开一个15秒的滚动窗口嘛,所以这边我们已经开好了啊,然后接下来啊,这里边我把这个放到上面去啊,都放到上面的这个注释住掉的这些行我都放到上面。
01:04
呃,这个后边我还是把它写出来,这个是一般我们用做技术窗口的一个定义,这是滚动,滚动技术窗口。然后接下来我们其实就要开的,既然是15秒统计一次嘛,统计当前15秒内的最小值对吧?呃,那接下来当然就是开一个15秒的滚动时间窗口了,直接用time window这么一写就完事了,然后接下来我们就要进行操作,操作大家知道要传这个窗口函数,那如果要是非常简单的只拿这个最小值的话,其实我们实现非常简单啊,大家知道我直接命就可以对不对啊,或者说我直接命BY,在这里边命和命BY没什么区别,因为大家看我只有这个map成二元组了对吧?然后它前面的这个ID都一样,都是当前的这个温度,呃,这个当前的传感器ID嘛,后边这个温度值呢,我只要取出最小的放到这儿不就完了吗?啊,所以这里边你命明白都可以啊啊,那可以直接怎么样,这个命调用的时候,你看它的传递的这个方式啊,传参的方式还是给一个string类型的当前的这个。
02:21
字段的名称,或者说给一个int类型当前的这个位置,对吧?呃,这里边我们能调的这个,呃明白啊,就是这两种方式跟我们之前那个KY之后直接求它的最小值的这种方式是一样的,所以这里边呢,其实就是你只能在这里边给一个比方说我们是第二个元素去做一个,呃,做一个最小值的统计,那当然就是命一对吧?啊,这种方式整体来讲可能会,呃,就是比较简单,但是说我们就想到了有一些这个复杂的需求肯定就实现不了了,对吧?啊,那比方说像之前假如说啊,我是要把这个前面那个呃,Time step啊,我还是要留下,比方说我把它map成一个三元组这样啊data.time呃。
03:11
Time把这个也保留下来,对吧,然后还是按照ID做一个分组,然后后边呢,我们又来了一个需求,输出的时候呢,就是还是输出三元组,ID不变,保持不变温度呢,取当前的最小值,以及大家还记得之前我们那个需求吗?对吧?以及最新的时间戳那那那这里边大家又会想到了你这个如果直接用一个明白显然就搞不定了,诶那我们现在怎么办呢?这个其实也非常简单,你用一个一般化的reduce不就完了吗?啊,所以接下来我们做一个reduce啊,这个reduce还是一样,大家还记得之前我们那个传递我可以做什么呢?这里边可以直接给一个reduce function,或者传一个传一个这个函数,对吧?大家比较习惯的应该是这种写函数的,呃,就是写直接写一个匿名函数的这种方式,那我们这里边先给大家写一遍这个啊啊就是这两。
04:11
两个参数同样还是第一个参数是之前聚合的那个结果对吧?啊,之前叠加起来的那个规约出来的那个结果,所以我们把这个叫做呃,这个current result吧,然后另外还有一个叫做就是当前的这个new,然后经过转换之后要得到的是什么呢?啊,又是一个三元组对吧?因为大家知道这个处理的过程当中,这个函数就是当前的这个数据类型是不能变的,下面你这里边也是所有的数据类型都是一样,如果是三元组,三元组,这里全是三元组,所以这里边我们取什么呢?呃,当前结果的ID。哎,不是ID了,因为三元组那得是一下划线一对吧,放在这里,然后接下来,所以大家看上面我们这个三元组没什么没什么用,其实就是把这个它的顺序调了一下,对吧?之前我们第二个是这个时间戳,现在调到第三个位置了啊,其实只是这样而已啊,然后接下来我们看后边第二个位置是当前的温度值,要最小的温度,那当然就是current,呃,Result它的下划线二跟我们当前new data的下划线二,要去做一个取最小值的操作,对吧,要去做这样一个操作,这是我们这个呃写法啊,然后后边还有第三个元素,那就是当前的最新的时间戳啊,那我们就直接用new data的下划线三把它放到这里就可以了啊,这就是一个非常简单的一个reduce的过程。
05:51
啊,那或者呢,我们也可以自定义实现一个reduce function对吧?My reducer这里边大家注意我要实现的大家意啊,这里边实现的这个呢,我们之你看到就是就在这里啊,Reduce这里面上面我们可以传一个方式,对吧?跟我们之前调用这个呃,K外之后去做reduce的时候,大家看用的这个是一样的,对吧。
06:27
都是这里边的这个Java接口叫做reduce function啊,所以大家会发现就是底层我们开窗之后啊,它也是先分组再开窗嘛,开窗之后做的这个规约,做的这个聚合,跟之前我们直接基于这个分组的数据去做的这个规约聚合其实是一样的,对吧?都调的是API common functions下边的reduce function,好,那这一部分接下来我们就可以把这个数据类型传进去,当前是这个s reading对吧,里边必须要实现,诶不是sensor reading啊呃,啊,这个我们现在是一个三元组了,对吧?这里调的话是一个三元组了,我这里面假如说不做转化,那我直接用s reading也是可以的,因为我们那个三元组其实没有做一些本质的操作,对吧?啊,那这里边我其实要的就是VALUE1VALUE2,那大家知道VALUE1就是已经聚合出来的结果了啊,最后其实你要得到的那还是得包装成一个三瑞啊,那里边第一个当然还是用当前的ID对吧,然后用支。
07:27
之前的temperature温度的最小,诶我们第二个参数是sensor reading的第二个参数是什么?是time stamp是吧?那这里边不能用这个啊,我们应该用VALUE2的当前的呃,这个最新的这个,然后最后一个参数是VALUE1的temperature,去取一个最小值跟VALUE2的temperature做一个比较,对吧?啊,所以这个其实是一样的一个操作,那我们可以直接在上,你把下面这个map都注掉对吧,直接你在上面这个this stream啊,K by ID,然后我们直接调一个这个呃,Time window,然后reduce,你直接把这个my reducer传进来就可以,这种调用方式也是OK的,那这里边大家就看到,诶,我这里边是。
08:20
注掉了一个东西是不是。好,我先把这个写出来啊,下边我们这个最后应该还应该有一个行对不对,这里边,然后呃,上边如果说这个得到结果之后啊,我们把它这个result stream做一个打印输出,好啊,那接下来我们可以给大家来测试一下啊,看看现在这个效果怎么样,运行一下看一看。看到这里边已经运行结束了,诶大家看到这里边没有任何的输出,这里边为什么没有任何的输出呢?大家能理解这个是什么原因吗?啊,大家会看到这里边其实是直接就退出了,为什么会退出呢?就是因为我们现在读的是文件,直接从个文件读取,读完了之后处理完,哎,是不是他就直接退出了呀,对吧?哎,这个过程其实是没有任何问题的,那这里边直接读进来退,一直到所有的数据处理完到退出,大家想这个有15秒那么长吗?根本没有对吧?诶当前就那么几条数据,前面我们看到这个等待的过程,大部分都是这个编译的过程啊,直接运行起来之后,其实非踌一瞬间就过去了,那一瞬间就过去了,它已经退出了,是不是相当于我们这个窗口还在等着,还没关啊。
09:39
那当然当前没有任何的输出了,对吧?你这窗口还没关呢,然后这个所有的数据都已经处理完了,退出了啊,所以这其实呃,大家如果要认为这是这个就是有界的这个文件输入的一个一个bug也可以啊,但这确实就是它的一个特性,对吧?我们这里边你有界输入完了之后,它就会关闭,那么你现在还没有到达等待的这个时间,那它真的就关了啊,那这个当然就看不到结果了,所以为了避免这种情况,我们可以换成换成一个socket文本流的输入,对吧?我们把它改一下,Strip input stream,我们直接用socket textpe,但是大家用卡夫卡也可以,因为卡夫卡相对来讲可能稍微的麻烦一点,重一点,我们做一个轻量级的测试,对吧?这里边我就直接写死了啊,不写那个参数了,O host777定义一个这个,然后呢,接下来我们就直接去起一个,呃,这个NC7777,先把它放在这,然后接下来。
10:40
再去运行这个代码,接下来啊,大家就想到了,你这里面至少它不会直接退出,它不停的在那儿等,对吧,我这儿输输输数据都可以输的慢一点,然后你看看是不是能够得到对应的那个结果啊,所以接下来好,现在已经提起来了,对吧?好,那接下来我们就来一条一条输数据了啊。
11:02
好,首先是这里面的第一条数据,这个三三十五点八这条数据啊,三四十一输入,然后三四十六入,大家注意看这的这个输出啊。大家知道应该是要等15秒对不对,诶大家看这里边直接到点就直接输输出了对吧,而且大家会发现这里边输出的时候几乎是同时输出的176,而且大家发现跟我这里边的这个输入的顺序还不一样,为什么输入顺序不一样呢?因为大家想这个到点输出的时候是不是分组,每我们说这个每个组里边的时间都是对齐的,对吧?哎,输出的时候这个分组各自的那个数据是不是就是同时几乎是同时输出的呀,对吧?啊,所以说这个时候它几乎是同时输出,那我们这里边其实你在做这个,呃,就是最后打印输出的时候,这个先后顺序其实就比较随机了啊,因为大家知道这就相当于就看你在做那个我们当前并行度是一嘛,那相当于他在竞争那个CPU资资源,谁先竞争到,然后谁先把这个结果做一个输出,哎,那刚好就先出来了,对吧,那后面的就排到后面了,所以这里边并不是严格跟我们这个输入数据的顺序有关的,因为它那个最终关闭窗口输出的时候就是一个时间点啊啊,那大家再来看一看啊,诶大家看我刚才又说了这么长时间话了,为什么后边没有没有再输出了呢。
12:34
对吧,看起来这个只输出了这么这么一次,后边我们等了这么久了,好像没有再输出,为什么呢。啊,因为大家想到前面这是15秒的数据,对吧,我是滚动窗口没有重叠的,哎,那接下来的15秒是不是我啥都没输入啊,什么都没输入,没有数据,那你说你让人输出什么嘛,当然就没有没有输出对吧?甚至可以告诉大家,Flink的底层,如果当前窗口内没有数据的话,他连这个窗口,连这个桶都不会去创建啊,啊所以说就是你可以认为这个窗口就是有了数据来了之后,他才去创建,才会去到点去关闭,对吧,才会做这个计算,没有数据的话就不会触发这一切啊,那接下来我们就继续给大家输入这个数据啊,来一个十,诶大家看这为什么我直接刚一输入它就它就输出了呢?为什么这个看起来好像没有等15秒,像前面我们感觉好像等挺长时间的,对吧,这里为什么直接就输出了呢。
13:34
啊,大家仔细一想就能想到,因为接下来我们这个15秒一个窗口,15秒秒一个窗口,第一个窗口确定了之后,大家想到是不是后边这窗口的位置就都确定了呀,对吧?因为头连尾,尾连头嘛,我们一开始输入数据的时候,可能是你连着输入,诶那感觉是等了大概15秒之后输出了第一次结果,然后后边输的这一次呢,我是不是有可能这个这个数据刚好在这儿输入啊,然后他刚输入这个数据,马上就到15秒刚好截止的这个时间了,所以这里边他就突然就看到它输出了一条,对吧?啊,所以这个行为是可以理解的啊,那接下来我们还是看看这个最小值它有效没有,对吧?341,我们多输入几条341啊,这是32,呃,这个二零八三十六点二,诶大家看到现在我输入输出的是什么呢?最小的温度值32.0,然后是208当前最新的这一个时间戳对吧?啊这个大家就看到这个。
14:34
特点了啊,然后我再再输入这个29.7,然后再输入这个30.9,接下来我们如果要等下一个窗口输出的时候,大家能想到肯定就是最小值应该是29.7对吧?呃,诶大家看到这个等时间稍微长一点,29.7,然后当前的时间,除了最新的是213啊,这就是当前这个啊窗口它本身输出的一个特点啊,给大家做了一个测试。
我来说两句