00:00
刚才给大家做的这个简单测试是一个增量聚合的测试,那有同学可能会感兴趣,就是对于这个全窗口函数来讲,又是一个什么样的状态呢?哎,那给大家还是举一个对应的例子啊,上面这一部分是。增量聚合函数,那下边这个是。全窗口函数。全窗口函数啊,那接下来我还是先做一个k k by ID,然后接下来是不是要去做一个type window对吧?啊,这里边我还是啊,直接给一个time.SECOND15秒,给一个15秒的窗口,然后接下来我做这个全窗口函数计算的时候得怎么做呢?哎,大家会看到哈,我这里边就不能调这个reduce aggregate了,那我接下来要调的是。大家看调的是一个方法,叫apply。
01:00
Apply这个方法里边应用对吧,应用这个方法里边是不是本身默认传的这个参数最简单的啊,是不是就传一个叫做window function的这样一个接口,实现这样一个接口的类啊。Udf函数类对吧,然后这里边这个window function大家看必须要实现的一个方法,也就叫apply对吧?啊,所以这个跟我们前面这个是不是都很像啊,基本上实现的过程都很像对吧?那我们看一下这个你有一个window function啊,你有一个window function,那这里边你看这个window function,它有这个四个泛型,这个泛型更多啊。这类型更多,首先是in out,那这当然就是输入输出的数据类型,这个比较简单,那这里面KK又是什么呢?前面我们不是做过那个分组嘛,对吧?所以你看在这个window function里边,它可以拿到当前key的信息啊,所以这里边获取到的信息就比较多,另外还有WW是当前的window对不对,Window的类型,所以当前它是不是能拿到的信息就会更多啊,你看当前的K是什么类型,它默认得到的啊temple对吧,是不是元组类型啊,因为这是前面我们用了这个字符串来指定当前的这一个K,那是不是得到的这个k stream就是一个元组类型对吧?K stream的里边的这个键就是一个元组类型,所以后边这里边window function里边也是一个元组类型,然后后边就是一个time动类型,对吧?然后得到的这个结果前面我们也想了,想要得到一个in体对吧?哎,这样的一个整形的输出,我就是统计一下个数嘛。
02:37
好,那接下来里边必须要实现的是一个apply方法,那这个apply方法里边大家看一下里边这个参数啊,这稍微有点多,首先是temple类型的一个temple,这是个啥呀。这就是当前的key对吧,这就是当前的键啊,所以这个整体来讲这个还是比较简单的对吧?啊,然后另外还有就是window window是不是现在也也也可以包含在这个信息里边啊,这都没问题,对吧?然后另外还有就是大家看input input不是就是当前的。
03:12
所有输入的数据啊,对吧,就在这里边都有了啊啊,然后另外还有一个out out,大家一看它是个。Collector,所以你一看这个apply方法是不是没有返回值类型,所以那我们最后要输出这个窗口的计算结果,靠什么输出呢?al.collect对,所以大家想这是不是就实现了之前那个flat map的那个方式方式呀,那所以flat map不是可以一对多吗?那大家想是不是我现在也可以一对多,可以很自由的输出,对吧?或者大家想那个flat map的那种方式的话,你如果不输出是不是也行?是不是我不输出就是不调alt.collect不就不输出吗?因为你这里边apply本身没有返回值类型吗?都行对吧?呃,所以这个就是非常的灵活了,它整体来讲就会比较灵活啊,那其实大家就想到那之前我们这个输出其实我觉得有缺陷啊。
04:03
大家看这个你这个全窗口函数,如果要是说跟前面这个对比的话,你你这里边可以这个呃,就是直接来一个就加一次,来一个就加一次,最后直接拿到这个聚合结果就输出了,这看起来更高效,而我现在是不是相当于是把这个数据全攒齐了的呀?啊,那大家可能会想到,那攒齐了之后,我的这个最最终的这个这个个数应该是多少呢。那是不是直接就对大家其实可以想到啊,我是不是直接就把这个input,它是个interable类型对吧?呃,然后我拿到它这个erator,就是大家可能会想到我这里边是不是直接可以去拿到它里边的个数就可以了呀,所以可以直接就是用这个ator u对吧?U,所以这里边我们可以直接调用ator us的to list的方法或者to瑞方法啊,直接把它转换成一个这个其实也是可以的,后边这个就直接可以用它的size啊,对吧,直接已经拿到了一个list的话,这个直接获取这个size就没有问题了啊呃,这个其实只是一个基本的这个使用,大家如果要直接写一个for循环,其实效率上来讲不会更复杂啊,只是就是代码可能会多几行,这个其实没有什么问题,然后接下来,那最后我们想输出的其实就是这么个东西是吧?呃,那就是把这个inte是不是直接输出就可以了。
05:27
那大家想我这里边应该是怎么样。接下来是怎么样输出呢?out.collect对,直接把这个count是不是直接输出就完事了呀?啊,所以当前我做的这个操作,大家看跟上面这个其实是可以得到相同的东西,我这里边做一个。呃,这样啊,我们把这个定义一个result stream2,然后接下来我们把这个result stream2做一个打印输出。接下来还是在这里边我们做一个统计啊,大家可以看到这个统计跟之前有什么区别,或者说呃,有哪些不同。
06:08
好,我这里边先给一个341,然后给一个346。大家看统计这个数据肯定就是从我第一条数据输入之后,对吧。然后三四十三四一哦,这里边已经输出了,大家看这个是不是就是111,因为我分别是不同的K嘛啊,所以这里边肯定是这样啊,然后接下来我继续输这个三一,因为15秒我们这里边可能快速的输入不了多少条数据啊。好多输入几条数据,然后这个直接给一个346吧,让大家看清楚一点啊。诶给一个这个346。然后大家看到。当前我的这个状态是多少?诶大家看到这里面有点奇怪啊,这里面为什么我输入输出的是121这样的一个一个效果呢。
07:05
为什么是这样的一个效果呢?哎,这里呢,可能就有点奇怪了,我不知道这个到底它输出的这个结果是怎么回事,对吧?啊啊,这里边我们可以就是做一个就是具体来讲啊,做一个其他的一个考察,可以做一个改进,因为大家发现这里边直接输出的这个结果啊,比较比较奇怪,只有一个count数,我根本都不知道这当前到底是谁,对吧?呃,然后另外也不知道当前到底是哪个窗口,那所以大家会想到你像之前我这个直接做聚合的时候,增量聚合的时候能输出其他信息吗?好像输出不了,但是现在全窗口函数,尽管效率上好像比之前那个稍微低了一点,因为是收集齐了所有数据,最后才统计它的个数,对吧?哎,那但是现在我是不是可以拿到其他信息啊,哎,所以接下来我可以做一个更改啊,我的输出不要输出一个音体值,不要输出一个这个对,大家想到我是不是可以输出一个元组啊,比方说我输出一个三元组,我要的是什么呢?是当前的啊,Sensor ID对吧?一个string sensor ID,然后另外还需要。
08:16
还需要什么?啊,就是我我当前是不是还需要当前的这个window的信息啊,那这个window的话,大家会想到我用什么来代表当前的window呢?啊,其实这个简单,我用它的那个结束时间或者起始时间是不是就可以了,我们用结束时间吧,结束时间因为刚好是他触发计算的那个时刻嘛,这个比较明显对吧?啊,所以这里边结束时间,那这里边window里边有一个长整型的时间戳,可以作为它的结束时间长整型对吧,那最后还有一个当前的那个个数。呃,顶体质对吧,把这个定义出来,这是一个三元组类型啊啊,那所以接下来哪些地方就得改了。
09:00
哎,这里面out就得改了,对吧,然后另外。这个是不是也得改了,对吧,把这个改了啊,那当然了,这里面就涉及到这个问题了,Count我是拿到了,那前面还有一个就是当前的那个ID呢。String,一个ID,对吧?ID怎么拿呢?是不是还得从这个呃,Temple里边去拿呀,对吧?那所以这里边我可以去get,大家看是不是可以get field,因为是个元组嘛,对吧,Get field,这里边我直接GET0是不是拿到就是我当前的第一个位置的这个数据啊,对吧?啊,所以这里边大家看到就是呃,Get field啊,你这里边就是直接从零开始的一个一个索引位置啊,就是这样的一个元素啊,然后另外还有就是因为我当前就是一元组嘛,对吧,只有一个一个key啊,另外还有就是当前的那个window end,大家会看到,比方说我定义一个window and,那怎么样去取呢?可以基于window的信息get and,大家看这个是不是拿到的就是一个。
10:04
就是一个长整型的and呀,对吧,大家看这个and其实就是我当前在这个window里边定义出来的一个属性,对吧?哎,直接获取就完事了,所以这个其实还是比较简单的,呃,所以在这个全窗口函数里边,我能拿到的信息就会多很多,最后是不是输出一个三元组,这是ID window and,还有一个count,是不是这样就完事了?好,接下来我们再来运行一下啊,大家看看这个测试的效果怎么样。我们还是把这个做一个分屏显示这边啊,还是啊前面这个可以,我到时候可以这个输出的这个稍微的快一点啊,3411个346。一个SENSOR71个。三四哦,这个这个太快了啊,我们没来得及对吧,大家看只有341346这个是不是它都是呃,因为我们这个输入的数据后边三这个三位这个是毫秒数对吧,毫秒数所以都是零,所以大家看这是020啊大概这个20秒结束的,呃,然后你看这个347是不是就归到下一个窗口了,对吧,前面我输输入输出的输入的慢了啊,所以它是不是15秒之后。
11:20
35对吧,035,然后结束的一个窗口,这是这个347啊,接下来我输入的这个再快一点啊,刚才我们测的时候是后面连着给了几个这个341对吧,341我就直接都给三一吧,大家知道我只统计数量嘛,然后346。再来一个346,诶大家看这里边有了,大家看为什么这里边我统计哦,这个统计是对的对吧,这个统计因为这就看的很明显了啊,他当前结束的这个窗口是什么时候呢。这是30秒之后的一个窗口,那为什么35加15,也就是050的那个时候没有输出结果呢?
12:02
对,大家想没有数据吗?没有数据,那那是不是就相当于没有输出啊啊,所以当然就没有任何东西啊,然后接下来这里是再下一个窗口,是不是就是65秒啊,65秒这里你看一是有三三个,六是有两个,这是在之前基础上统计的吗?不是,是不是累加的呀,对吧?所以这里面假如说我继续给啊六诶大家看很快就给了一个一,这是为什么呢?因为刚好到点了,对不对,110对吧,刚好到点了,所以这个就就就直接把它归到这一个窗口里面去了,后边如果我们再输入这个一的话。诶,大家看后面又又收进来两个对吧,所以一是不是就得等到下一个窗口了啊,所以接下来我再多输入几条,一大家想这个会基于什么去算,会基于之前的三去算吗?不会对吧,大家看这个是四,我连续连续输入了四条,它是四对吧?这都是统计当前这个窗口内的数据。
13:03
啊,这就是我们当前统计的这个过程,大家通过这个例子也可以比较出来全窗口函数和增量聚合函数的一个简单的区别,对吧?它就是把所有数据都收集起来,效率上看起来是低一点,但是它有好处啊,它收集起来你能干的事情就多,而且是不是在这里边还能拿到当前的key的信息,Window的信息,能够包装起更多的信息,让我们看的更明显啊,对吧,这个就看的就更明显一些。呃,这是关于这个window function,这个全窗口函数啊,另外还有一个全窗口函数,这是之前给大家说过的那个叫对process window function,那它调用的时候呢,要去点process,然后这个里边去扭的就是一个,哎,大家看到就是一个process window function对吧?啊,这就是也是一个全窗口函数,然后这个全窗口函数它是一个抽象类啊,它里边必须要实现的方法是一个process方法。
14:02
它跟那个window function呢,基本上差不多,大家看这里边是不是差不多啊,也是有一个K,然后有这个orable类型的elements,那大家想这是不是所有的数据都在这,然后另外还有一个out对吧?Collector输出的时候用它所不同的是window function里边这里是一个window,而它这里是一个上下文,对吧?啊,那这个上下文大家就想到了,上下文里边有没有window呢?是不是里边也有window啊啊,所以就是包含的东西更多一点,能做的事情更广一点,对吧?啊,这就是这个全窗口函数的这个用法啊,我把这个就注掉了,大家就知道这个全窗口函数和增量聚合函数的差别了。
我来说两句