00:00
我们刚才已经完成了时间窗口的一个测试啊,那接下来我们再快速的做一个。技术窗口的测试,因为刚才我们没有做技术窗口测试,对吧?啊,所以接下来在这个代码里边还是新建一个测试的类啊,Window test2,好,接下来这个叫count。烫头一脚。呃,那同样前面的这个流程跟前面完全一致啊,我把这个就直接copy过来吧。首先创建执行环境啊,然后把这个我们还是从那个ET文本流里边读数据,对吧,这个看的更明显一点啊,然后转换成这个sensor reading对应的这个类型。然后最后有一个env execute执行起来,然后中间的这个开窗测试啊,呃,就是开一个技术窗口。技术窗口测试啊,那么这个例子的话,我们可以统计一个,就是比方说啊新明现在我们是这个sensor reading啊传感器数据,那我们还是统计一下,比方说我统计一个当前,呃,比方说连续三个温度值,呃,或者说五个对吧,呃,温度值他们的这个平均温度值吧,我可以做这样的一个统计,对吧,或者说这个甚至十个都行啊,大家大概知道这个意思就可以,所以接下来我可以做一个。
01:28
做一个计算啊,那么data stream要去开窗,是不是先做一个KBY啊,我统计平均温度,那应该还是要按照这个当前的这个ID来分,你你只统计当前传感器的平均温度嘛,肯定不会混着统计啊,所以接下来是k by ID,然后time window,呃,不是time window,当前我们要的是count window,对吧?那这里边我还可以要求,比方说我统计十个数的平均温度,另外呢。我要求隔两个数滑动一次,顺带我们把这个滑动窗口也测了对吧?啊,那大想现在这个状态是什么呢。
02:07
哎,是不是就是对,其实就是一堆数据来啊,我这个大家知道这这里有十个啊。啊,大大大概这样一个样子啊,十个数开一个窗口。然后后边是不是隔两个数滑动一次啊,是不是应该就是这样的一个统计方式啊,对吧?然后再往后统计,每一个窗口里边都是十个数这样的一个状态,这就是我们想要统计啊十个数的一个平均值,所以接下来大家看一下该怎么统计呢?Count window接下来做的操作是不是也是跟之前类似?也是可以做增量聚合,也是可以做全窗口聚合,那现在我们要做一个平均数的统计,那该怎么做啊?啊,当然你你可以全窗口对吧,全窗口把所有数据都统计收集齐了,然后最后是不是来一个for循环,全部加起来,然后除以个数啊除以十就完事了嘛,哎,那这里边我们想给大家介绍的是我可以直接做一个。
03:08
增量聚合,那大家想增量聚合的话,首先我想到的是那个,首先那个sum max这这就不行了,对吧,当前这个就肯定简单的这个聚合就不行了,那reduce可以吗。Reduce里边是一个reduce function,但是大家想到reduce function里边它有一个特点是这个大家还记得reduce function里边是不是它的那个数据类型不能改变啊,这个大家还记得吧?里边这个reduce,我们中间的那个聚合状态是不是必须得是sensor reading啊,那你这个sensor reading的话,大家想我可以把那个呃,就是当前的那个温度值做一个累加,对吧?但是我怎么样累加完了之后还要直接算它的那个这个平均数,然后就就直接放在这个3READING里边,然后再跟下一个做聚合呢?这个好像没办法这么聚合对不对?诶,但是大家看到前面我们做的这个aggregate function,它是不是就会灵活一点啊,它的特点前面我们不是说了它有三个泛型嘛。
04:14
它是不是就说明输入输出首先类型可以不一样,另外中间的那个ACC累加器聚合状态是不是也可以不一样啊,那自然这里边大家就想到了,前面我们这里面用到的是中间聚合状态是什么,我最后就直接输出了累加状态和最后输出结果的类型是一样的,那现在我是不是可以稍微改变一下,实现这样一个。求平均数的这种方法呀,哎,那你看我现在可以直接不要reduce了啊,我直接来一个agggate,我去new一个啊,那当然这个我可以用另外一种实现方式,前面我们是匿名类啊,我在这里边我定一个MY啊,Average a。Camp,对吧?我定义这样的一个类,呃,那大家知道这是public static class,接下来它应该实现的是一个什么?哎,Aggregate function对吧?里边的数据类型输入s reading,注意中间的这个聚合状态应该是什么?
05:18
首先最后输出的那个应该是double对吧?然后那有同学可能说那中间我的这个聚合状态也是double啊,但大家想一下,假如说我中间这个大家想啊,你这个double到底要存什么呢?你存的是和还是存的是,呃,就是我我当前那个已经算出来的那个平均数呢?假如说你算的是平均,首先你如果存的是和的话,后面是不是你根本不知道到前当前有多少个,那你如果要存的是平均数的话,后面你是不是也因为不知道它当前是多少个,导致你再来一个数的时候,你不知道它该怎么样去合起来计算对不对?你想之前我是五个数的一个平均数和之前是六个数的一个平均数,再来一个计算的时候,这个效果不一样,对吧?再来一个数,你不能直接加起来除以二对不对,哎,所以那这玩意我怎么算呀?哎,对,大家想到了,我这是不是中间聚合这个状态,我存两个数不就完了吗?我。
06:18
呃,存一个当前所有的呃求和的那个温度值,另外再存一个对当前的总个数不就完了吗?那当然要包装成一个二元组对吧?呃里边呃,那那这个求和的话double,对个数的话inte对吧,直接来一个这样一个东西啊,那所以有了它之后。大家看接下来我这个操作的时候,Create cuumulator,这应该创建一个什么呢?是不是你有一个二元组,一开始是什么?哎,对0.0对吧,呃,这个double类型啊,然后零对吧,二元组嘛,然后A每来一个数的时候怎么办?
07:00
大家想一下这个怎么办?哎,这还应该是你有一个temp对吧,然后接下来我要的这个当前的这个温度值应该是。大家想一下应该是什么,是不是原来的这个accumulator它的F0,然后再加上当前value的对get temperature啊,然后另外对另外是原来的accumulator的F1,再加上当前,哎加什么加一就完事对吧?Countt加一嘛,所以大家看是不是我现在要的是这么一个东西啊啊,然后最后get result呢怎么办?哎,是不是一除就完事了对吧?我现在最后要的不是一个结果是一个W类型的平均数吗?所以你两个状态是不是一除啊,所以accumulator.F0除以accumulator点F1是不是这么做啊?哎,当然后面这个还有这样的一个,呃,一一个就是墨的一个过程啊,这个大家如果想写的话,也可以把它完完完整的写出来,那这个大家知道是不是就是a.F0加b.FF0。
08:12
然后后边是A点F1加B点F1对吧,就是求和,然后合并在一起,个数合并在一起,就这样嘛,啊,所以这其实就是非常简单的一个过程啊啊这样就把把这个方法直接实现了对吧,那后边我这里边可以把这个挖一下,这就是一个,呃,当前的一个avg对吧?Temp,呃,Result stream,这是我们当前的这个结果啊,最后我把它做一个打印输出,这就是我们整个实现的过程。好,那接下来我们来测试一下啊,看看这个效果怎么样,同样还是NC。这里边起一个这个NC啊,然后接下来一条一条数据做一个统计计算,那这个其实我就不用不用赶时间了,是吧?啊大家想这个就慢慢慢悠悠输就行了,因为它是按个数来的嘛,对吧,首先你看这个三四十一三十,呃36.3,然后三四十一三十五点八。
09:21
诶,大家看为什么这里我就直接输出了一个结果啊。这有点奇怪,说好的十个嘛。为什么他直接这就输出一个结果啊。这个是不是有点奇怪?首先大家算一下吧,36.3和35.8,如果做这个平均数的话,应该是多少?哎,对,大家想这这不是30就不用管了,对吧,这不就是前两两个加起来是11,这两个加起来01:12点一除以二是不是6.05啊,哎,没问题对吧,36.05这个算是算的对,但是为什么它是两个就输出一个结果呢。
10:00
啊,那我们不要着急,然后继续往后看对吧,我们看那个三四十一还有什么,还有还有32.8,对吧,32.8。哎,这就不输出了,对吧,然后后面还有什么数据来着。37.1再来一个,诶大家看又输出了一个35.5啊,大家大概的算一下啊,大概大概看一看哦,32.8 37.1差不多对吧,跟这个36.05大概再合起来这个差不多35.5,这个问题不大,大家如果感感兴趣可以完整的算一下啊,诶大家看现在的输出频率是不是就是两个输出一次啊,诶那我们就想起来了,之前说过窗口的输出频率是谁来决定的,滑动不长对吧?啊,滚动窗口那个是按当前的这个窗口长度来输出的,其实我们知道滚动窗口是不是就是不长等于窗口长度的滑动窗口啊,所以还是按照不长来输出的,对吧?所以这里为什么是这样一个输出结果呢?那就是因为对大家想到前面两个这个数据来了之后,它应该属于哪些窗口呢?首先大家可能会想到啊,如果我要是把这个完整的十个数据。
11:13
都来了的话,这是属于这样的一个,呃,十个数的一个窗口,但是这个窗口我们说是滑动的嘛,所以后面再来两个数,这又滑一次对不对。哎,那问题就来了啊,在后边的话,再来两个数,再划一次。问题来了,那能往后边滑,能不能往前面滑呢?诶,这个大家自然就想到了,之前你那个时间窗口可以往前面滑,是不是这个技术窗口理论上来讲也可以往前面滑呀,它相当于统计的是什么?就是你从现在开才开始统计之前的数据我是没来,但是这个窗口该输出的时候,我还要按两个两个一个间隔去输出,对不对?哎,所以这就相当于是我前面是不是这个窗口要这个啊,这个这个窗口就相当于是要做一个这个补齐对不对?尽管前面这里边没有数了,但相当于这八个数是不是也应该有一个输出啊,同样是不是前面这六个数也应该有一个输出啊,那前面是不是前四个数和前两个数都应该有一个输出啊?
12:18
所以大家就发现了,我们当前是不是前两个数就输出了第一个结果36.05啊,然后后面前四个数是不是输出了第二个结果35.5。啊,所以大家如果愿意去尝试的话,你会发现后面就是六个数的结果啊,然后八个数的结果,十个数的结果,最后你发现到十个数之后,再往后它不会再变成12个数的结果了,再往后统计是不是就不会算前面两这两个数了,往后挪在十个数的结果了,对吧?哎,所以这就是这样的一个滑动窗口的统计过程。大家感兴趣可以下来之后把这个技术窗口啊,技术滑动窗口在代码里边实现一下。
我来说两句