00:00
我们已经了解了动态表和持续查询的基本概念啊,也知道了在这个过程当中啊,主要可以分成三步啊,那三步的话,简单来讲就是先把流转换成动态表,然后把动态表基于动态表去进行持续查询,生成一个结果也是一个动态表啊,那最后呢,再把生成的动态表转换成流,那这个过程就有点像啊,之前我们在简单上手的这个案例当中,我们代码里边做的这个转换操作啊,先把一个流转换成表,然后基于表去进行转换计算,然后最后呢,再把表转换成流打印输出啊,那当然了,这里我们所说的持续查询的这个转换过程啊,并不是代码里边的显示转换,不是数据类型的data stream到table的转换,而是我们处理的底层对于这个数据结构的转换处理,那接下来我们就分步骤一个一个来进行讲解了,首先就是将硫要转换成代表。这一步操作呢,其实是非常容易理解的啊,那其实就是每来一条数据之后,那接下来我们表里边这张表里边就应该增加一条数据嘛,诶,所以我们会看到啊,这个过程当中呢,针对这个动态表更改,它都是一个插入操作,所以我们就会发现这张表的改变,如果说想要写成这个更新日志流的话啊,里边就只有音色的操作,只有加爱操作。
01:25
有时候我们也可以直接把这样的一个流呢,叫做啊颈插入流啊,叫做end only流啊,或者叫做颈椎加流,这就是这个初始的动态表的一个转换过程,为了更加直观的说明这个转换过程啊,我们还是结合一个具体的例子吧,啊,比如说像之前啊,我们写的那个非常简单的例子,在这里呢,哎,我们就不去读取这个文件了啊,直接from element啊,我们看这里写入了几个Alice Bob carry对应的一些访问事件event读进来了,这是一个data stream,然后这个data,基于这个data stream呢,哎,我们就可以去把它转换成表,得到一个table啊,注册一个table,然后呢,直接执行一个c select user,另外我们还做了一个。
02:09
Count URL的操作,我们得到的是URL comtable,这就是之前我们在代码里边曾经做的这个聚合操作。统计每个用户他的访问频次啊,那我们知道这样一个操作,当然是针对之前啊,我们已经得到这个even table,这是一个动态表啊,而且是只有追加信息的一个动态表啊,针对他要做一个持续查询嘛,Select啊这样的一个操作啊,C查询,那这样一个持续查询得到的结果呢,当然也是一个动态表。那至于这个动态表到底是只有追加的操作,还是说会有更新操作呢?哎,那这个过程我们就可以结合这里的输入数据一条一条来做一个分析了,啊,我们可以看这里的图。首先我们可以看到啊,第一条数据到来之后,哎,那当然对应的在even table啊,这张动态表里边就是追加,那后面的数据每一条来了之后都是追加,这张表没什么好说的,关键是看后边的持续查询啊,这里的这个持续查询的过程呢,哎,我们因为有一个count啊统计数量的操作,所以来了第一条数据之后,Alice丝这条访问数据之后,我们对应的这个结果表URL com table里边只有一条数据,Alice丝一。
03:25
然后呢,第二条数据,Bob的访问数据来了之后呢,在后面追加了一条,BOB1统计Bob的这个对应的count值啊,那之前没有过,所以追加,然后第三条数据又是爱丽丝的一条访问数据,这个来了之后。注意,现在就不是在后边追加一条了,而是要把之前的第一行数据爱丽丝一改成。现在的爱丽丝二,所以我们发现啊,当前的结果表当中出现了更新操作。啊,所以我们看到这样一个更新操作,就不能简单的把它认为,诶可以转换成一个data stream,转换成一个流,诶数据一条一条来了的,这个新数据直接输出就完了,不行,我们必须要加上当前它的更新类型,当前的类型信息啊,就是到底你是要做插入还是要做更新,是要删除一条之前的数据,还是要追加一条新的数据。
04:22
所以在这里这样的一条时序查询,我们就把它叫做。更新查询啊,就是所谓的update query。那在这个例子里边,我们也能总结出来啊,为什么它会出现这个更新操作呢?关键点就在于这里边我们使用了抗的这样一个聚合函数啊,就是分组,按照user做了group by分组之后又调用了count做了一个聚合,那这样一个聚合操作的话,很显然就会更改之前的一些结果数据啊,那所以这就涉及到了啊,最终这个结果表里边不仅仅有音色的插入的操作,还有update的操作。
05:02
啊,那如果说这里我们没有做这样的一个聚合操作呢,那就是我们所说的啊,就是开始做的那个最简单的提取操作啊,比如说直接SELECT2个字段,URL user from当前这张表,然后where加一个筛选条件user,等于Alice把Alice的访问数据提取出来,那这样的一个查询呢?啊,当然就是每来一条数据,有可能有对应的输出,也有可能没有,那所有的这个结果表里边啊,结果动态表里边只会有。后边插入的操作不会出现更新的操作,那这样的持续查询我们就把它叫做追加查询a pen query。啊,所以在这里呢,我们会发现啊,如果在代码当中,最后我们想要把这个查询的结果表再转换成流做打印输出的话,那得调用什么样的方法呢?啊,前面我们已经提到了啊,如果是这个更新查询的话,因为结果动态表里边出现了更新操作,那所以我们就只能to change log stream只能调用这个。
06:07
转换成更新日志流的方法啊,把对应的这个更新日志流打印出来啊,那里边的话可能会出现这个加爱插入的一些操作,也有可能会出现更新操作,那就是一条更新,就是减U加U2条更新日志数据啊,这就是我们所说的更新查询那。如果是不涉及到更新操作,诶,如果是一个追加查询的话,得到的结果动态表,哎,那就相当于里边只有insert加I的数据,只有往后面追加,那我们就知道了最后得到的数据,要得到的表想要转换成流的话,可以直接调to data stream方法直接转换就可以,当然了也可以像更新查询一样调一个to change log stream,那我们知道它的更新日志呢,里边就只有加I,没有减U加U的那些操作数据。
07:00
所以到目前为止呢,我们看起来好像可以总结这样一个规律啊,就是什么时候后边需要必须调用to change stream呢?诶,那就是出现更新查询的时候,那什么时候我们执行的这个持续查询啊,这条CQ会出现它是一个更新查询呢?诶那看起来呢,好像就是。定义了类似于分组聚合这样操作的时候,一旦出现了聚合啊,比方说这里我们做一个抗统计啊,那这个数当然要不停的叠加,要不停的更新了,那这个时候结果动态表里面就会出现更新操作,哎,那这个查询就是一个更新查询,对应的后边转换成数据流的话,那就应该是to changelo。啊,这个结论看起来好像没什么问题啊,但事实上呢,更新查询的判断标准。是结果表里面的数据到底会不会发生更改,并不是说我们的CQ里面到底有没有聚合操作啊,比如说像前面我们这里的这个分组之后的count啊,它最后带来了这个结果动态表里边更新操作,所以当然这就是一个更新查询,那有时候呢,我们这里定义的一些聚合操作,它不会引起结果表里面的更新啊,比如说这里举一个非常经典的例子,那就是。
08:19
窗口当中的聚合啊,比如说我们可以考虑啊,呃,在这个流处理里边,我们考虑开一个滚动窗口,统计每一个小时之内所有用户的点击次数,诶,那所以这样的话,我们可以在这个结果表里边啊,我们统计什么呢?加一个字段统计每一个用户,在每一个窗口里边,诶我们需要有一个窗口信息,就是在每一个小时的窗口里边,然后访问URL,总共访问了多少次,它的这个抗值也要统计出来。啊。对于之前我们的那个抗技术,最终聚合的那个结果呢,相当于就多加了一个窗口字段信息,我们这里使用的是NT,也就是窗口的结束时间作为窗口的标志。
09:04
那对应的这个CQ呢,我已经直接写在这里了,这里涉及到了一个滚动窗口的定义,我们现在可能还没有讲到,但是没关系啊,我们大概知道它主要就是定义了一个滚动窗口的聚合就可以了。同样前面我们看到select提取的就是我们定义出来的这三个字段了,User window and as andt,另外还有一个count URL,哎,我们统计当前窗口内当前用户的所有访问次数,那后面from呢?From这个table,我们看需要把当前的这个table还要定义一个窗口,哎,我们定义了一个INTERVAL1小时,很明显要做的就是一个一小时的滚动窗口聚合。后面呢,还要有一个group分组,分组的字段除了user之外,还有窗口的起始和结束点的信息啊,所以我们现在分组就是按照用户和窗口进行了一个划分。这是我们当前进行持续查询的CQ,那当前的数据输入进来之后会有什么效果呢?啊,我们会发现啊。
10:06
首先。输入的数据来了之后,诶,对于我们这个输入的动态表,当然就是一条一条在后面做做追加了,追加进来之后,诶,经过当前这个窗口聚合的持续查询,那是每来一条数据就会有对应的结果输出吗?并不是的,因为我们现在是有窗口,要等到一小时的滚动窗口到达结束时间的时候啊,窗口要关闭的时候才会输出一次统计结果,所以我们看什么时候输出第一次的结果呢?很显然啊,前四条数据。有爱丽的三次点击和Bob的一次点击,我们看到那后边,既然这是一个时间戳啊,我们认为它是一个毫秒数,所以乘以6万,我们知道如果是毫秒的话,6万就是一分钟嘛,啊,所以那就是就是25分钟有一次点击,55分钟又有一次点击,很显然这都是在第一个小时之内的访问时间。
11:04
所以我们可以看到啊,当呃,假如我们当前使用的这个时间与一时事件时间的话,那就是水位线,如果已经达到了第一个小时的结束时间的话,那么第一小时啊,那我们可以认为就是零点到一点啊,或者说我们说这个12点到13点啊,这个都一样。这一个小时的窗口就应该要关闭了,我们输出对应的统计结果,我们看输出的。是这样的两条数据,ALICE3 bob1,好,那对应有这样的四条访问数据的统计啊啊,那我们看到当前的窗口结束时间就是一小时,第一个小时,然后接下来剩下的三条数据呢,Bob的一次访问和carry的两次访问,这就都属于。一小时到二小时的第二个滚动窗口,所以当当前的水位线已经超过第二小时的时候,接下来我们输出的结果又是两条,CARRY2和BOB1。来注意,这里的这个Bob一并不会覆盖之前的BOB1,这并不是一个更新操作,而是一个追加操作,因为每次统计的都只是当前一小时的数据。
12:18
现在统计的这个数据跟之前已经没有关系了,而在这一小时之内啊,我们叠加统计的这个数据呢,并不会一次的输出到结果动态表里面,我们只是不停的做增量聚合得到的结果,最后一次性的输出到结果动态表,所以我们看到啊,结果动态表里面的数据都是。追加进来的只有音色的操作,而没有update操作。这就是我们说的啊,即使是有聚合操作,做了一个抗的统计,也可以最后的结果没有更新,那当前的这个持续查询呢,就是一个追加查询,而不是更新查询,如果我们想要把它做一个转换成流打印输出的话,那其实就可以直接调用to stream方法,然后打印输出就可以了。
13:07
啊,在这里呢,我们文档当中直接把这个完整的实现代码已经列在了这里,这里我们需要注意一点是,既然涉及到了窗口时间窗口的操作,那很显然我们还需要去指定当前的时间语义,然后去从数据当中提取时间戳,然后生成对应的水位线。啊事件时间语义嘛,所以在这里边我们一定不要忘记这样的一步操作啊,我们现在是基于之前的数据流get string,直接就已经生成了水位线,然后后边的话,那就不用再去做另外的定义了,在转换成表的时候,还需要去指定一个专门的时间属性,这里边是调用了一个点揉time方法,好,这就是指定当前的时间字段。关于这个时间属性和窗口的用法,那么我们会在后边下一节里边再展开进行讲解。
14:00
那这里还需要多说一句的,就是关于我们当前去进行查询的时候,查询是有一些限制,因为在实际应用的过程当中,有些持续查询啊,它的计算代价是非常高的,那什么样的这种计算我们需要去进行限制,需要注意考察它的性能呢?一个就是状态的大小可能会随着时间持续增长,因为我们知道做这个持续查询啊,如果把它应用在这个流处理里边的时候,我们流处理的数据是连续不断,无休无止的嘛,那假如说我们这个持续的时间跨度非常长的话,那如果要是运行几周到几个月啊,甚至好几年的话,那我们可能要统计的那个数据总量是非常非常大的。而且每来一条数据,就要把所有的全量数据都做一次完整的查询,执行一句CQ,哎,那我们就会发现啊,随着我们的数据越来越多啊,比如说之前我们记录每一个用户访问UR的次数,诶,那当前这个用户越来越多,那访问的URL次数也越来越大,那么要维护的状态就会逐渐增长,最后可能会耗尽我们的内存空间啊,所以这种情况我们是要对这个查询要注意一些做限制的。
15:15
另外还有一个呢,就是更新计算的复杂程度,这是什么意思呢?针对有一些聚合统计,它可能所要做的每来一条数据之后的那个计算特别特别复杂,比如说什么呢?你像之前我们做的这个count统计,这个比较简单,每来一个就加一嘛,只要找到对应的那个K啊,分组了之后加一就完事了,这个就不复杂,那如果说我们要做的一个不是抗统计,而是。做一个rank排序,Rank我们知道是排名的序号嘛,如果要做排序的话啊,那就相当于我们需要。在每来一条数据到来的时候,我们需要把之前所有的数据啊整合在一起,重新去做一个排序,然后计算当前的rank值到底是多少,这个计算量会非常非常的大。
16:06
这里举了一个例子啊。就是按照每个用户他最近一次的点击时间,我们看是求这个时间戳的max啊,最大的这个时间戳来作为排名的计算标准,做一个排名计算一个rank值,那这样的一个计算的话,很明显就是每来一个数据的时候啊,有可能就会导致用户的排名发生变化,一旦一个用户发生排名发生变化,那有可能啊,别的用户的排名都要顺延,都要发生变化啊,所以这个更新操作的代价是非常非常大的。在这里我们是使用了一个所谓的over啊,开窗函数去做了一个聚合啊,这个我们也会在后边去进行详细的展开讲解。这就是我们所说的持续查询的具体的过程,这里我们一定要注意,就是到底它是一个更新查询还是一个追加查询。
我来说两句