00:00
我们已经了解了link CQ当中窗口的含义啊,那前面我们也说了啊,这里定义的窗口相当于只是一个窗口分配器,那具体的窗口函数呢,其实是使用了系统内置的一些聚合函数来进行实现的啊,所以我们其实知道啊,对于这个CQ而言,本身它其实一个最大的特点啊,提供最大的一个功能就是做各种各样的聚合统计,比方说哎,我们就是针对某一列的很多条数据啊,做一个合并,做求和来做一个S,或者说求一个最大最小值,或者计算一个平均数等等等等啊,所有的这些在C口当中都提供了一系列的系统聚合函数,我们可以很方便的把数据进行分组之后得到聚合的统计结果。所以这种操作呢,一般在CQ里边我们就叫做聚合查询aggregation啊,那在flink当中呢,同样link CQ也要支持CQ这些原生的聚合查询方式,在此之外,弗link还提供了非常有特色的流处理里边的聚合查询啊,那就是前面我们提到的啊,针对这个时间窗口的聚合查询,所以接下来呢,呃,我们也主要就是可以分成两方面来去介绍一下link CQ当中的聚合查询方式啊,那主要就是一类那是CQ原生的聚合方式,另外一类呢,就是流处里里面特有的聚合方式。
01:25
首先我们来看一下这个原生聚合查询里边最经典的一种聚合,那就是分组聚合,好那这个在CQ里边所说的这个聚合呢,都是分组聚合啊,本质上来讲就是首先我们针对当前。表里边符合某种要求的一些数据,我们先做一个提取,提取出来之后呢啊,就按照某一个字段直接做一个合并啊,这就是我们所说的分组聚合,那一般情况我们都是通过一些内置的聚合函数来实现的,比如说非常熟悉的啊求和sum,那最大最小值max me,那如果要求平均值的话,之前我们在datacpi里边还要自己去定义那样一个LG的方式啊,现在当然就很方便了,Avg啊,那直接统计就可以了,还有之前我们那个技数也不需要像那个word count啊,很笨的一个一个WORD1WORD1,然后做一个S,这里边我们直接count不就完了吗?诶所以在CQ里面最好的啊,最方便的一点就是有很多现成的函数可以直接调用,呃,那如果要总结一下这样一个聚合的特点的话,那它的数据转换方式其实就是多对一转换啊,因为我们收集的其实就是多条的输入数据,然后呢,最终计算之后得到的是唯一的一。
02:40
个值是一个多对一的转换啊,那具体在调用的过程当中呢,其实不一定非要用group by去进行分组啊,啊,因为我们可以对全量数据直接进行统计嘛,比如说我们看直接select count芯,From table,那其实这个count芯这就表示我们直接统计这张表里边所有数据的个数啊,那如果说不带group就相当于是不分组,那就是所有数据嘛,更多情况下呢,我们还是要goodbye指定一个K来,然后做一个分组统计的,比如说前面我们说过的啊,统计每个用户他的访问点击次数,那这样的话我们就是select user count URL,直接调这个count做一个次数的统计啊,那么C啊,做一个重命名,From当前的even table,这张表后边要指定分组的k group by user,这就是我们所说的分组聚合。
03:34
所以我们发现啊,所谓的CQ里边的分组聚合,其实就跟我们在data three API里边啊,K之后按键做分组之后,然后去执行对应的那些reduce或者s max做聚合转换是完全一样,本质上是一样的啊,只不过我们现在呢,是在流处理里边做这样的一个聚合查询,那当然了,这个查询就是一个持续查询了啊,所以。
04:00
之前在代码当中我们也看的非常的明白,如果说这里边我们用到了这样一个分组查询的话啊,那接下来得到的这个动态表里边就会有更新操作,哎,所以我们说啊,得到这个动态表里边出现更新操作,这是一个更新查询,那么接下来这张结果表如果想要转换成流打印输出的话,那就必须要调用to changelo stream方法,这样的话,我们在这个控制台打印输出的时候,是把它用了一个撤回流的方式啊,就是如果更新一条数据的话,那是一个减优撤回之前的一条数据,然后再加U。插入一条新的数据,用这种方式实现了更新这样一个消息,这样一个操作的传递。这部分非常的经典,我们之前也已经做过了,所以我们就不再详细去做测试了啊,那如果说我们还想实现其他的需求的话,那其实就是在这里按照CQ里边的写法啊,调用不同的函数不就完了吗?Max means some,随便去调,只要符合我们CQ标准规范的那些都可以直接去用,那另外这里还需要去强调一点的就是。
05:04
因为这里是一个持续查询,所以呢,哎,这里我们用于分组的这个K啊,有可能会不停的增加,比如说我们当前啊,Group by user,或者说by u ID,那在一个网站当中,用户可能是不断增长的,我们要处理的这个K分组会越来越多,那么计算所要维持的维护的这个状态就会持续增长。啊,那所以我们一般情况啊,为了防止这个状态无限的增长,耗尽系统资源,所以一般情况下啊,我们都要去配置一个所谓的状态生存时间,之前我们在状态编程里面就提到过这样一个概念,那么在table API和C组当中呢,也可以进行配置,之前我们在状态编程的时候是使用了状态的描述器去定义TTL,现在呢,呃,就不太一样,我们可以直接。在表环境的配置项中配置一个全局的TTL啊,所以我们可以直接获取当前table env的config,调它的get con方法,得到当前的table config,然后呢,调用它的set idle state retention这样一个方法,也就是说如果当前某一个状态啊,I豆当然就是空闲了,已经空闲超过这样一个时间之后,比方说我们定一个60分钟,一个小时。
06:21
一小时,如果当前这个状态已经一直空闲,不再被使用了,那就相当于到达它的生存时间了啊,接下来就可以把它清空释放了。啊,当然了,我们前面这种方式呢,这是使用了代码当中全局获取这个table config啊,获取配置项,然后方法调用的方式去进行设置的,同样我们也可以使用啊,类似于配置文件里边配置项的这种方式去进行设置啊,啊,那如果在代码当中去写的话,那就是configuration.set stream set什么呢?当前的cable ex e.state.ttl配置这个项,把给它一个对应的值,就设置了我们当前所有状态能够生存的时间。
07:03
那这里需要注意就是说如果配置这个状态生存时间的话,哎,那到时间他就会被清掉啊,那假如说啊,清掉之后,可能这段时间这个用户他的状态一直没改变,那之后这个用户他状态又来了新的数据呢,诶那这个时候就有可能会造成我们统计结果的不正确啊,所以这种方式是以牺牲正确性为代价,换取了资源释放啊,就让我们的这个资源不要太紧张,不要耗尽。哎,那另外呢,在这个分组聚合当中,其实有很多这个经典的CQ操作,CQ函数啊,都可以去直接使用,比如说我们可以直接使用这个distinct的去进行去除啊,所以在C当中我们要统计UV其实是非常非常简单的,直接的就可以了,然后另外呢,还可以使用heavy对于聚合的结果进行条件筛选啊,另外还可以使用grouping sets啊,分组集设置多种不同的分组情况进行统计啊,所以这些跟这个经典的CQ用法完全一致,我们完全可以把CQ当中的用法全部迁移过来。
08:07
所以CQ对于这个原生CQ的支持还是非常强大。这是关于分组聚合。
我来说两句