00:00
另外在table API和弗link CQ里边呢,还定义了另外一种窗口叫做over window啊,Over window大家知道这个over子句啊,其实是标准CQ里边已经有的一种聚合方式,对吧?我们在查询那个select子句的时候,其实就可以直接定义这个over子句啊,那所以之前我们在这个have里边大家也接触过开窗函数,对吧?那其实是差不多的啊,这个操作基本上是差不多的,那我们需要注意的呢,就是over window它在做聚合的时候跟group window前面我们那个group window不一样,我们说group window就是它的一个窗口,然后就是属于当前这个窗口的所有的数据都放进去,对吧?哎,都在里边去去做统计,最后输出的是一个结果,而我们现在这个over窗口呢,那这里边是每一行数据,大家要注意啊,就是每一行数据都会计算它跟周围相邻的行去做聚合,然后得到一个结果,对吧。
01:00
哎,每一行数据来了之后都会得到一个结果,所以这个over窗口它其实更像一个啊,大家就会觉得更像一个map操作一样的感觉,对吧?但它不是map,因为它会涉及到自己周围的那些相邻行,然后做聚聚合的一个计算,它肯定涉及到状态的,对吧?啊,那具体来使用的话,这个op window呢,同样还是点window去调用,前面我们也看到了,点window里边传的参数可以是一个group window,也可以是一个op window,对吧?哎,这里边可以传一个open window,然后注意后边也是必须要给一个别名,对吧?SW,然后呢,后边我们直接给一个select select里边定义基于window做的一些聚合操作,这个聚合操作呢,你就不能像前面我们那个group window一样,直接点sum就完事了啊,因为之前我们那个是做过group by的,对吧,你是基于窗口做过分组的,你现在又没有做分组,没没有做分组的话,我怎么知道要。
02:00
对当前的这个定义的这个over window去做聚合呢,哎,所以这里边要多加一个表达啊,多多加一个这个over,然后加上当前的这个窗口对吧?所以整体调用也还是非常简单的啊呃,那对于table API来讲呢,同样呃它我们在之前那个group window里边提供了tumble session啊slide对吧,提供了这样几个类型,然后来表示这个group window111呃一组的那个转换操作得到最后的结果啊,表示一个group window,那这里同样table API里边呢,提供了一个类,这样就可以实现我们这里面的这个window的这个参数的要求啊那接下来我们可以看一看代码里面到底怎么样去指定,这里边就分了不同的这种指定方式了,我们可以首先是可以在不同的时间语义下,对吧,事件时间,处理时间都可以指定,然后呢,可以指定,因为不是说要跟它周围的那个行去做聚合吗?哎,那这。
03:00
里边我们可以指定按照时间间隔去聚合某些行,或者呢按照行数计数,对吧?到底周围的多少行去做聚合,这个是都可以去定义over window啊,那这里边整体来讲呢,分成两大类,一类就是无界的over window,当然另外一类就是有界的了啊,我们首先看无界的,无界的是什么意思呢?那意思就是说全量数据对吧,你你不用说什么那个,呃,当前那个多少行或者多长时间范围内的,我就是到目前为止的所有数据都聚合起来,这就叫无界的over window啊呃,那这个看起来这个就就有点有点像全量数据的一个一个聚合,好像就没有必要开窗了,对吧?啊,但这也是一种定义的方式啊,我们看一下具体怎么来定义,那over window里边其实也是可以去指定当前的K的,而且推荐大家去指定K,为什么呢?你指定了K之后,接下来我们做操作的时候,就相当于有了K代码啊,那后续做的那个操作。
04:00
就相当于真的可以就是并行去处理了,对吧?啊,要不然的话这个并行度就会损失,那over window里边的K,它的这个分组是怎么样去定义的呢?Partition by对吧?哎,这里边就是over,然后后面protectition by,比方说某一个字段A,然后接下来呢,Order by order by就是排序对吧?因为你既然是要指定一个范围嘛,当然是得排好序,这样的话才有意义。比方说我现在order by your time,事件时间,基于事件时间指定一个无界的over窗口,然后基于A这个字段去做分组,对吧?呃,然后接下来来看有一个字段叫preding,就是之前向追溯到多少对吧?追溯到哪里呢?啊,这里边给一个常量值叫做unbounded range啊,也就是说啊,没有界限的一个范围对吧?没有无界的一个范围,向前追溯到无穷远,那就是有多少数来多少数,对吧,所有的数据都聚合起来后面。
05:00
给一个别名SW,这就是这个在table API里边定义over window的一种方式啊,最常见的一种方式,呃,那那后边大家看到,还有就是处理时间也可以对吧,那只是在后边我们这个order的时候,哎,你就不要用这个RO time了,你要用pro按照处理时间这个字段做一个排序,然后我呃截取之前从头到尾的所有的数据啊当然有同学可能会说,你既然都已经是这个呃所有数据了,那就没必要再排序了吧,啊对,是的啊,Order by这个并不是一个必须呃必不可少的这个,呃呃,这这里大家注意啊,就是order by是确实是必不可少的,后边我们看那个就是方法调用的时候啊,层层调用你就会发现这个order by还是必不可少的,所以说尽管看起来最后的效果没什么,好像是没没没什么有用的这个结果啊,但是这个必须给出来按照什么去做排序,因为后边我得知道就是正常来讲。
06:00
的话,他大家想啊,是不是相当于我当前在这个当前的这个表里边啊,做的这做的这个判断,我当前有一行数据对吧,一行数据写在这儿,他要跟之前之后的数据,如果要去做一个聚合的话,那你是不是必须,如果不同的这个排序对它这个聚合的结果有很大的影响啊,一般化来看的话,当然是这样,所以说我们需要有这个orderbi定义,当然这个proceeding是不是必须的,为什么你这里边你既然都已经是无界了,对吧?那这个其实可以省,Proceeding是可以省的,然后与之对应的,它后面其实还可以定义一个什么呢?定义一个following,就是proceeding是说向前聚合对吧,聚合到多少,那following指的就是向后聚合啊,聚合到多少,多少条数据对吧,但是现在这个following呢,其实呃,没有太大的意义,为什么呢?因为following最多就只能到当前这条数据,因为我们现在是流式输输入对吧,大家想。
07:00
一想你如果要是这个呃,就是批处理的话,我们如果要是直接在一个关心数据库里边去做这个操作的话,它前后确实都有可能有数据的,这是没问题的,但是在流式处理里边,理论上来讲啊,如果说我们这里边呃经过这个处理之后,那是不是呃就就是相当于就应该是到目前,特别是这个处理时间啊,那到当前这个数据最新进入来,进来的一条数据,你做操作的时候,是不是相当于只有前面有数,后面应该没数啊啊当然有同学可能想到就是那你要这么说的话,乱续数据啊,我们那个那种情况下,不就有可能出现这种况吗?对吧,就有可能他呃他后本来排在后面的数,呃也有可能提前已经来了嘛,有可能出现这种情况,但是现在的table API和flink s还不支持去聚合,就是它following超过自己的那些数,就是你排序之后啊,就order by排序之后超过自己的那些数,还不允许直接去做聚合啊。所以也。
08:00
现在你那个following其实没多大用对吧?啊,那另外还有就是说不光可以基于一个时间范围去做聚合,还可以基于行数啊对吧?呃,多少行去做聚合,那如果是无界的行数,那怎么办呢?那那大家看这里边给的这个常量就不太一样了,之前那个叫unbouned range,现在叫unbounty的肉啊,其实最后效果差不多,因为我们知道你如果都是无界的话,都是从头到尾嘛,然后前面你还必须都要指定一个排序的字段,对吧?一般情况我们都是按照时间去做的排序,那就可以区分事件时间,处理时间啊,这就是这个无线over window的一个定义方法啊,然后还有就是有键的啊,更常见的其实就是有键window的一个定义方式啊,那这里面我们就要用一个时间间隔呃,来定义当前这个时间范围,对吧,或者说按照一个行数来定义我们当前聚合的这个行的范围啊,所以说这里边比方说。
09:00
怎么定义呢?同样还是potential by某一个字段K,然后呢,Order by可以是roll time可以是pro time,呃,各种时间语义对吧?排序排序完了之后呢,我可以proceeding one minutes啊,就是之前一秒钟A1分钟之内的所有数据,然后我以当当前这个数据为基准,对吧?之前一分钟之内的所有数据聚合起来,哎,那这里边大家注意就是是不是跟我当前自己这个数据的当前这个roadtime也有关系了,对吧?而我要取的就是当前自己的,再往前推一分钟这个范围内所有的数据开一个窗口聚合好,那同样后边我们还可以给一个比方说十行Rose,就表示以自己为基准,之前的十行数据拿出来,对吧?啊,当然就是也包括自己啊,合在一起,相当于是11条数据做一个聚合啊,那最后就可以得到这样的一个结果了啊,那每一条每一行数据都做这样的一个处理,最后。
10:00
就呃,这首先是我们定义这个over window啊,那后边是还要定义基于它去做一个,基于它去做一个这select,然后里边调用那个呃,聚合函数对吧,然后over这个window,那就可以实现了,这是关于这个table API里边的用法,那另外还有就是CQL里边的用法,CQL里面大家知道这个,其实CQL本来就有类似的这种over over window的这种表达,对吧?你像有一些数据库里边,像这个Oracle里边本身就有这个over这个语法啊啊,那这里边我们看到它就是开窗函数这种用法嘛,比方说我们select count amount对吧,Count这个amount的这个数量,然后over over一个什么窗口呢?大家看着position by对吧,User,然后order by pro time跟我们前面那定义是不是一模一样啊啊,然后后面roses,诶,注意这里就是指定了一个行数的一个间隔对吧,就是BETWEEN2PREC,就是之前的两行and current啊就是。
11:00
直到当前current对吧,当前这一行为止,所以这样的几行数据做一个聚合啊,那基于这一个统计啊,每一行对应的统计一个amount的count值出来对吧?得到的就是一张结果表啊,所以这个其实CQL里边表达还是比较简单的啊,那大家需要注意的就是说我们必须要定义相,就是所有的聚合,你这里边要做的啊,就是所有我们有这个聚合函数的这个操作啊,必须都是同一个窗口的对吧?必须有相同的分区排序和范围,然后呢?呃,当前我们只仅仅支持当前行范围,之前的窗口之之后的那些数据收不进来,对吧?那这里面必须在单一的时间属性上指定对吧?就是这个order by,这个我们当前的这个时间属性啊,那所以这就是一个op window的用法。
我来说两句