00:00
接下来我们要介绍的呢,是flink CQ当中的聚合啊,这其实我们知道就是C当中非常常见也是用的最多的一个功能了啊,那就是对于一张表里边某一列所有的数据,或者是符合某些条件的多条数据做一个合并统计,得到一个最终的结果值,比方说诶,某一列我们做一个count。算当前到底有多少个啊,然后针对某一列做一个萨,做一个求和计算,或者说做一个求平均值,或者最大最小值啊,那所有的这些操作就叫做聚合查询,其实这个是对应着我们之前在data API里边介绍过的所谓的聚合转换的啊。也就是所谓的。Reduce aggregate一系列的这些操作,那在flink当中呢?Flink CQ本身它就是一个流处理、data媒PI和标准CQ查询相结合的一个产物,所以在flink当中的聚合查询啊,就可以整体来讲分成两大类,一大类是流处理里边特有的聚合,另外一大类是CQ里边原生的聚合查询方式。
01:18
啊,那这里边如果说要按照这个标准去划分的话,我们想到什么叫做CQ里边原生的聚合查询方式呢?像之前我们所说的,直接上来之后写一个count,然后group user这样做了一个查询啊,Count当前每一个用户分组之后来进行统计,进行聚合,这样的一个运算显然就是CQ里边原生的啊,那如果说我们是上一节提到的基于时间字段,然后使用窗口表值函数定义一个窗口,然后接下来再使用count sum对应的聚合函数去进行聚合的话,这个显然就是。Flink流处理里边比较特有的窗口聚合了啊,那所以主要是分这么两大类啊,那接下来首先我们先讲最为基本的分组聚合。
02:09
这个里面其实一般我们对这个都非常熟悉了啊,都是通过一些内置的聚合函数来实现的,比如前面我们已经用到的count啊,那还有some max m AV表示average求平均值,这些都是非常简单,也是应用非常广泛的。它的特点当然就是针对。多条输入数据直接进行计算,得到一个唯一的值啊,那当然了,呃,像有一些函数啊,使用的过程当中,我们可以后边不对。当前的所有数据,当前的表指定分组的K,比如说我们可以直接像count这个就肯定没必要嘛,直接统计数数量嘛,我们可以直接select count芯from当前的,这是完全没有问题的。但是更多的情况下啊,我们应该还是要指定当前分组的K,然后是按照这个标准啊,每一组统计出一个最终的结果,所以后边往往是要跟着一个group句的,那之前我们就是这种用法了,Select user,然后count URL from group by user,那就是按照用户去分组统计每一个用户。
03:25
他的访问点击URL的总次数,也就是看这个用户的活跃度有多少啊,那这种聚合方式就是我们所说的分组聚合。已经知道了,从概念上讲,这样的一个分组聚合,其实跟data streamam API里边的K之后做的那个聚合是完全一样的啊,那只不过CQ里边是,而之前我们做的操作是K而已啊,那后边我们所做的这个聚合计算都是针对当前K来进行划分聚合,进行聚合统计的。
04:00
那在流数理里边,我们知道当前要写这样一句CQ进行聚合查询的话,显然这是一个持续查询啊,因为数据是不停的来的,因为table里边的数据是要不停的增长的,那这个时候当然我们得到的结果也会不停的变化,也是一个动态表,那这个动态表呢,有可能会出现更新操作,因为我们很简单的啊,做这个count统计,或者说呃,如果要是some map的话,同样也是有可能更新之前某一个K对应的那个数值的,所以有更新操作的这样一个动态表,或者说得到的最后应该是一个。更新日志流啊,那所以我们想要把最终的结果表转换成流或者输出到外部系统的时候,就必须使用。之前介绍过的撤回流或者更新插入流的编码方式啊。如果说直接在控制台打印的话,那就不能直接to data,而是要。
05:05
转换成更新日志。另外还有一点就是在CQ里也类似于之前我们介绍。Three API里边的状态编程一样,在聚合里边,它其实也是要保存一个当前的状态,那在持续查询的过程当中,我们的K如果非常多,而且不断增加的话,那相当于我们要维护的状态也会不停的增长。为了防止无限增长下去,我们知道无限增长下去,那最后一定就就是耗尽内存资源啊,那所以在table API CQ说这种接使用了一个叫做cable conig的这样一个类的对象啊。这就相当于当前表环境的一个配置项了,我们可以基于当前表环境直接调get con FA方法获取到当前的table con,然后在里边直接去set idol state rotation这样一个参数,给一个当前的时间duration,这样的话就可以了。
06:17
比方说给一个60分钟啊,60,那很显然这就是一个一小时的T,也就是说当前如果说状态在内存里面已经保持了,创建之后啊,如果没有任何的更新访问。已经保持了一小时,没有任何操作的话,那就直接可以回收药。可以直接清掉啊。或者另外一种方法,这是在代码当中直接去配置,针对当前的提交的这个作业有效,那或者呢,我们也可以使用这个配置项的方式,配置项的话,那就当然是也可以在配置文件里边去定义,也可以在代码里面啊,代码里面的话,本身这个配置项是table.ex e.state.ttl这两种方式是等效的。
07:06
那当然了,如果要是配置了T时,结果就可太准,所以相就牲结果正确性,为换取了我们这个内存资源的一个释放。那另外呢,还有就是在flink CPU里边,我们知道呃,还可以调用一些比较高级的一些。方法,那比方说其实也是一些现成的CQ里边的语法函数啊,比方说distinct distinct可以直接进行聚合。聚合结果的去重啊,那所以去重之后再做聚合,呃,我们之前说到UUV相当于就是PV的一个去重处理嘛,啊,那所以使用这种方式想要去直接做UV的话,比我们在datatpi里面去实现就简单多了。那另外也可以使用像这个heavy,对结果进行一个条件筛选啊,或者说使用这个group sets分组,设置多个分组情况啊,这些其实都是可以的,这跟我们在标准CQ里面的用法完全一致。
08:08
那这个整个的过程其实就是。知道对应的这个TTL配置怎么配,然后其实在CQ里边直接用就可以了,跟。我们所熟悉的C中的用法基本上是一样,那接下来我们在代码里面还是把这个再来做一个简单的回顾吧,啊,那接下来我们接着之前这一部分代码已经有了。当前这个表的定义以及时间属性的定义,那接下来我们要做的操作其实是。做聚合查询转换了。聚合查询转换。首先我们能想到的最简单的是所谓的。分组聚合。这也是我们能想到的最一般化的这个聚合方式了,我们可以直接调用table CQ query方法里边写一句完整的Q,那比如说我们现在就直接select啊。
09:09
User。非常简单,Count这个大小写没关系,函数调用,那我们知道可以直接。Count URL,这就是我们之前所说的,当然了,这里不写URL,直接COUNT1也是一样的啊,我们就直接相当于统计当前的这个数嘛,所以这里我们可以直接from当前的。C table。这里我们需要注意,在前面我们已经注册了一个click table,所以这样的话,我们用的就是直接创建表的时候,DD这个定义啊,已经注册好的,而后边这个table呢,我们只是有这样的一个。Table对象,它并没有直接注册在当前的表环境里,所以我们用的还是上面这个。啊,那接下来我们看一下from,必须后边要跟上做一个分组,这个最简单的方式当然还是user。
10:07
这样就完了啊,所以整体来讲还是非常容易理解啊,能够想到最后到底是什么东西的啊,那这里我们可以把这个写一个。保存一下当前转换之后得到结果当然也是一张表啊,我们把它叫做a table。做了一个分组聚合得到的表。当然最后如果说我们还想看到这张表里到底是什么东西的话,那可以非常简单的做一个。A去做一个two date。好,当然我们这里面。就需要去调用c en to方法,诶这里需要注意,如果直接to stream的话,显然最后是没有办法把它正常打印过来的,因为当前是有更新操作的,这之前我们都已经说过了,所以这里边我们要调的是。
11:03
Re,然后接下来啊,那就是把a table做一个输出,然后直接print。这样的话就完成了。如果我们想要做的是流,要打印输出的话,我们还需要注意,既然是流处理的话。最后还应该要有Env.TE要执行起来,我们用到这个的执行环境啊,要处理当前的。然后这里边可能还有一些小细节,就是我们需要注意当前的这个定义,Click,这里边我们声明的字段啊,其实并不是user,是一个username。哎,所以这里边我们如果要是完整的去实现的话,那应该是要把这里的group改成user,如果是用就是当前我们。用第二种方式定义出来这个table的话啊,那这个是可以直接使用这个user啊,这里需要注意一下。
12:01
哦,另外这里还有一个问题,就是我们这个create dl其实是没有创建表的,没有执行的,一定要记得还要把当前的。这个表首先要创建出来,所以是SQ create d传,这样的话接下来我们就打,就可以看到当前的数据是什么样子了,我们可以加一个前缀,叫做agg。我们看到首先输出的是当前的,就是上面我们有一个print,把click table,它的表结构做了一个打印输出,然后接下来我们看到读取了当前文件之后。这里可以看到,就是按照当前每一个用户他访问的次数。Marry啊,加I,因为我们是to change out stream嘛,所以一开始MARY11次点击,BOB11次点击,然后ALICE1后边Bob再来的话,那就是Bob减u bob1加u bob2更新一次,这是retra stream撤回流的表达方式啊,然后另外再再来一条爆B的访问数据,那就是把二再改成三,就跟我们之前测试得到的结果其实是完全一样。
13:20
除了这种比较简单的分组聚合之外,我们还可以把另外一种情况也包含在这里边,就之前我们所说的分组窗口的定义,分组窗口的聚合过程啊。这一部分我们就不把它放在窗口聚合里面讲解了,我们直接把这个老版本的分组窗口聚合放在分组聚合这里面一并说完就可以了。或者我们这里单独的可以说一下,这个叫分组。窗口聚合。这个调用方式我们其实现在已经不常用了,只是做一个简单的介绍,我们如果看到以前比较老版本的一些这种flink的书写的这个代码的话,大家可能会看到类似这样的一个应用,我们也应该知道啊。那这里我们。
14:11
为了看的更清楚一点,可以把这个CQ进行一个换行的书写。我们后边再加上一行,哎,那这里边的话,我们可以select当前的username。然后啊,同样我们可以count。啊者count u,这都可以,As c是我能想到的,首先应该要的。两个基本的聚合之后的字段,另外我们还需要有一个当前的窗口相关的一个字段,那比方说我们现在是一个最简单的滚动时间窗口啊,那滚动时间窗口的定义那是一个。Tumble。然后后边加一个括号啊,那如果要提取相关的窗口信息的话,那我们应该选取它的tbo start或者。
15:03
Tumble end。我们这里选取一下t end,当前窗口的结束时间作为一个字段表示当前的时间,那这里我们可以去把之前定义好的,诶,对应的这个时间属性字段要提取出来了,ET就可以放在这。后边需要加一个interval。到底是多长时间呢?最简单,我们就来一个十秒钟吧,因为当前我们的数据间隔时间都不长,Ten second。在这里可以给一个。别名就叫做。好,这个我们所有的字段就都已经定义完了。接下来可以。再去写接下来的事情呢,就是from当前的click。然后接下来GROUP1定要去做对应的分组操作,不后边的话。
16:01
要加的是。当前的username。本身类似于K的时候啊,分组的那个K是必须要的,除此之外,另外还应该要有一个。当前窗口的定义,窗口的定义其实可以直接把这个抄下来。只要把前面的T改成T就可以了。这就是一个滚动时间窗口,当前的时间属性字段是ET啊,它是事件时间属性的定义,那我们现在当然就是一个事件时间滚动窗口了,十秒钟一个。那为了看清楚,我们前面把这个可以再多空一格,后面这个是from。那当前的这个数据可能稍微有一点问题,因为我们好像只有十秒之内的数据啊,那没关系,我们可以后边再追加两条数据。比方说Mary可以在11秒的时候再来一次点击,Bob可以在13秒以及后边15秒的时候各来一条数据,这样的话我们的数据更多就可以。
17:05
体现出不同窗口的聚合结果了。那当前只是做了一个。做了一个CQ的执行得到的,那这个应该叫分组窗口。Window。Result。当前这个表结构我们就可以注掉了啊,那类似的这个功能。就可以直接放在这里。只不过要把当前的。Group window放在这里。我们可以加一个group。Window。看一下当前的结果是什么样的。运行我们看一下。啊,当然这个会夹杂着前面这个A的内容啊,我们只看当前的group window就可以了,我们看到只有这样的五条数据啊,所以这里面因为我们只有。
18:06
从一开始一秒开始到最后只有15秒的数据,所以当然就十秒一个窗口的话,只有两个滚动窗口,哎,那就是零到十秒一个窗口,呃,这里为什么是这个零八呢?呃,我们知道时间说都是从1970年1月1号零点钟开始的,呃,它应该是标准UTC时间,也就是格林威治啊。伦敦那那个地方的标准时间,零点开始,00:00:00,然后计算的一个毫秒数值,所以我们这里直接给出来的啊,它本身转换之之后,我们提取之后,转换之后也是标准时间戳的这种。格式嘛,也是time stamp嘛,所以这里其实本身是一个。UTC时间的1970年1月1号的零点零点整啊,那对应的我们在本地时间的话,我们现在在北京东八区的话,当然就是一个早上八点钟了,我们的时间要比伦敦时间要早八个小时嘛,所以我们可以看到第一个窗口它的结束时间输出是1970年1月1号的八点零十秒,那当然了,第二个窗口当然就是八点零二十秒了,结束时间了,所以统计出来的那就是Mary在第一个十秒零到十秒之内有两次点击,Bob有六次,而爱ice只有一次。
19:28
那同样后边我们追加的这三条数据看的就很明显,Mary是一次报两次,这就是当前分组窗口的聚合统计结果。如果我们细心的话,其实会发现这里只有加I操作,因为之前我们也说过,对于当前的窗口聚合计算而言,中间的计算过程,那它是中间要保存一个当前的。聚合状态的,我们可以做增量聚合,每来一个数据就叠加,叠加一次,但是窗口的统计结果它不会输出啊,它只是在内部默默的统计更改状态,并不会修改我们结果动态表里面的那个值,所以当前结果表其实没有任何的更新操作,所以所有都是在后边追加的一个结果,当然就都是加爱。
20:19
那这样我们就能想到,这就是所说的,并不是所有的聚合都会带来表的更新,都不一定非要表示成更新日志流,哎,那既然只有加I操作,这不是相当于就是一个仅插入流吗?On stream吗?那当然在进行转换的时候可以不调用to方法,直接to data stream,其实也是一样的,我们可以。直接重新运行一下,看一看结果是否跟之前还是一致。我们可以看到完全没有问题,直接to date stream也可以得到相同的结果,就是关于分组聚合和分组窗口聚合的操作。
我来说两句