00:00
在上一节中我们介绍了动态表和持续查询的概念,那这个概念整体来看还是比较抽象的,所以这一节我们要结合一个具体的例子,详细的讲解一下动态表转换的过程,我们要从原理进行深刻的理解。那整体来看的话,如果是一张动态表和流要进行相互转换,自然就联系起来。之前我们曾经做过data stream和table之间的相互转换,在代码当中其实非常简单,一开始我们介绍的简单事例就是这样去做的,我们先去创建一个数据流data stream,然后只要调用一个from data stream方法就可以得到一个table。那最后如果说我们把查询转换处理出来的结果表想要再转换成流打印输出的话,那只要再调一个to data stream to stream方法或者是to change stream方法,就可以把它再转换成一个数据了。那我们现在说的其实跟这个有点类似,但是又不完全一样,因为我们知道这是在代码当中显示的,把一个data stream Java的数据对象,然后转换成了table这样一个对象,我们是有数据类型、数据结构上的转换的。
01:15
而事实上,在flink table apiq底层的处理过程当中,即使我们没有用到。没有一开始把这样一个stream定义出来。直接就是读取外部系统,直接创建了一个连接器表这样的形式,那它本质也是一个动态表。那自然我们就想到,如果现在是流处理的话,外部系统的数据来的时候,那还是应该一个接一个的来,连续不断的到来啊,那接下来我们这里创建出来的这张动态表,相当于是要把一个持续不断的数据流,首先要转换成一个动态表。所以这个过程在。Flink table API的底层是总要去做的,即使我们不做显示的到table的转换,我们也需要考虑。
02:08
是怎么样能够把动态的数据流变成一个动态表的?所以接下来我们主要就考虑这个过程,那为了更好地说明这个转换的过程,我们还是用之前所举的这个例子,也就是说直接读取。在网站上的点击访问行为,每一次点击都是我们这里边的一个日志事件,写在日志里边,数据的类型被包装成了一个叫做event的类型,里边有三个字段,用户名、用户访问的URL,还有访问的时间戳。那么。如果说我们当前直接读取数据源,把所有的数据解析出来,然后定义对应的每一个字段都是我们这里表里的一个列的话,那么接下来得到的这个table,或者我们在章节里边所介绍的是叫做table,这个没关系,那这就是一个完整的动态表了。那这个读取的过程到底是怎么样的呢?其实我们知道当前的数据流来了之后,如果我们把它叫做stream的话,每一条数据来了之后,放在这个这张动态表里,其实就是一行数据。
03:23
所以我们对于这个动态表的操作,那就是。在末尾不停的去添加数据。所以当前的操作。就是每来一条数据就执行对动态表的一次插入,那我们现在得到的其实就是一个只有插入操作的更新日志流,利用这样一个更新日志流就可以构建出对应的table这样一个动态表,随着点击事件的不断到来,那么当前的这个动态表也会不停的向下增长。
04:00
这就是我们能够想到的一开始。数据流到动态表的转换过程。具体来看的话,当然就是当第一条数据Alice点击的一次访问操作这条数据到来之后,那我们就在当前的表里插入第一条数据。然后接下来第二条数据又来了,这又是一个插入操作,哎,那我们想到如果前面加上了当前的kind的话。行的类型的话,每一个其实都是一个加I。所以接下来第二条数据爆破的访问数据我们就追加在后边。当前的表就。行数变成二啊,那后面也是每一条数据的到来,都会对应着表里边一行数据的增加,插入我们的表,就随着当前的数据越多,那么表整个的大小就会越大,这就是当前。数据流到动态表的具体的转换过程。我们会有另外一个问题,那就是说那这张表在不停的增大,我们在内存当中,如果要一直把它存下来的话。
05:06
之后岂不是不断增长会撑爆内存吗?随着时间的推移,一定是可以把内存全占满的。事实上并不是这样,因为我们现在处理的所谓的table,我们只是把它画出来,看起来这是一个拥有全部数据的这样一张表,实际上在真正的流处理里边,我们处理的是什么呢?就是它的更新日志流,也就是说我们通过对应的每一个加I,然后这样的一个更新插入操作,就可以表示出当前这个动态表的变化转换增长的过程,那这样的话,我们其实根本没有必要保存这张表里的所有数据,只要不停的处理当前它的每一条更新日志就可以了。包括后边的各种表的转换查询,我们其实都可以基于每一条更新日志来进行处理。
06:01
那接下来我们可以再继续去考察基于当前的动态表去做的查询,那就是第二步,我们接下来要去写一个CQ应用到当前的动态表上。所以我们知道这个过程,因为表是动态的,所以这个查询也不可能一次就查完就停止,所以这个过程是持续查询的过程,而且持续查询也会生成得到一个结果表啊,那也就是从一个动态表经过查询转换得到了另外一个动态表。我们还是看之前的例子,我们之前定义了一个Q,比如说select user和count URL c from,我们当前的这张表user。所以这个过程其实我们就是要统计,按照用户去进行分组,统计每一个用户他当前对于页面的点击访问次数。那在这个过程当中,我们就会发现经过转换之后得到了一个比如说叫做URL table这样的一个结果表,那这个结果表里面应该只有两个列,两个字段,一个是user,这是一个string类型啊。那如果在。
07:11
对应着我们在MYSQL里面定义的话,那就是还有一个是CT,这是统计出来的访问次数,在我们这里是长整型,那对应在CQ里面就应该是in,这是我们已经知道的定义出来的东西。所以接下来我们通过这个event table的一个持续查询,应该要得到这样的一张表,有两个两个列,两个字段的这样一个动态表。那我们看一下得到的内容应该是什么样的呢?想到当前的user,如果说之前没有出现过的话,诶,来了一个新的用户的点击,那显然是要在最后的结果表里边插入一条数据的,这个是没有问题,比方说这里来了一个A,然后点击一次。但是我们会想到,如果接下来。B也点击了一次,又来了一个A的点击访问事件的话,那很明显我们的这个查询转换就不是在后边追加一个A2。
08:10
而是要把之前A1这个一直接改成二,所以这个过程我们就会发现它是有所谓的更新操作,既包括简单的插入操作。加I也包括更新操作,那更新操作怎么来表达呢?对于我们当前的一个流而言,我们知道。转换过来之后,我们并不会去完整的存当前这张表,所以我们只是存它的更新日志,所以对于这张表的一个更新操作而言,它的更新日志就应该是一行数据,一条数据的更新就。应该用两条数据表示。一个是。减U减掉之前的A1。
09:00
原来的数据,然后另外还有一条加U表示。增加一条更新之后的新数据,所以相当于是把我们一次查询修改的这个操作,Update操作拆成了两步,一步删掉原来的值,第二步增加一条新插入的值啊,所以这样的话就相当于把之前的数据做了一个更改啊,那我们知道这样的一个表,如果要想转换成stream的话,那就不能直接to stream,直接把这个对应的数据,所有的这个更新日志打印了,而是要。To,因为当前还包括它的不能直接打印数据,还得把当前做的操作也完整的打印出来。这样我们也就可以更加深刻的理解之前在代码当中为什么经过聚合处理之后有更新操作了之后就不能直接to了,而必须要出。
10:03
我们可以再从图上更加详细的看一看当前这个持续查询以及生成的动态表,这个结果表到底是怎么样的一个变化过程。我们已经知道之前的table,也就是我们初始的输入表。它是一个不断增长的。一张动态表,哎,那所以这里边我们如果放在流里的话,每一次都来一条新插入的数据。然后呢,经过中间做了一个持续查询转换,这里边我们因为有count URL这样一个聚合函数group啊,做了一个分组聚合,所以我们最后是得到了一张有更新操作的动态表,那这个更新操作我们会发现,首先第一条数据,Alice的访问数据来了之后。经过当前的一个聚合统计。原原始一开始如果输入表没有任何数据的时候,当然最后的结果表也是空的,然后第一条数据来了之后呢,那首先这里多了一个ALICE1。
11:05
然后接下来第二行数据,Bob的一个点击数据来了之后,那现在改变变成的是。在后边追加了一条数据BO1啊,那这个只是做了一个单独单纯的插入操作,那不涉及到更改,这个比较简单,而后边如果再来一条爱的。访问数据的话,那么接下来我们就不是简单的在后面做追加了,而是要把之前爱丽丝一这条数据修改成爱ice丝二,诶,这就是我们说的当前的这个爱丽丝的数据进行了一个更新操作。然后再往后看,如果又来了一个carry的访问数据的话,这个可以继续追加到后边。在这个。整个处理的过程当中,有单纯的往后的追加,也有做了更新的这样的一个查询转换。
12:00
会发现最后得到的结果表,动态表里边到底有没有更新操作,看起来好像主要是跟这个CQ的形式有关,如果我们这里边使用了分组聚合这样一个操作的话。看起来我们最后就会有更新操作,然后如果进一步要再转换成流的话,可能就会有点麻烦,就是我们说的必须要把当前的看,你到底是减U加U,这个要表示清楚,要不然的话只把数据输出的话,那就说不清了。那我们自然就想到一个问题,如果对于这样的一个查询,我们可以把它叫做更新查询,那如果说我们当前没有对应的。分组聚合这个操作的话。对应着在之前的代码当中,那就是只是简单的前面做了一个提取s user URL from表,那这样一个转换相当于就没有做任何的更改,我们就知道它就可以只是来一个数据就输出一个,来一个数据输出一个,那之前的动态表它是不停的插入数据来一个一个的来,那经过这个CQ转换之后的动态表当然也是一个一个的来,直接插在后边就可以了。
13:17
那对于这样的查询操作,这样的持续查询,我们就可以把它叫做追加查询。所以呃,后面我们可以提出这样一个更新查询和追加查询的概念,就是只有插入操作,对于我们结果动态表的操作,只有插入没有更新,这样的持续查询就被称为追加查询。那我们就知道了,对于更新查询得到的结果,最后我们是必须要to changelo STEM才能把它转换成流。而对于追加查询的。持续查询而言,我们可以直接调用to stream,把它转换成流,因为当前就是一个一个数据来的,有点像我们之前讲到的data stream API里边简单的map filter flat map,类似这样的一些操作,那不涉及到分组聚合。
14:13
当然了,对于追加查询,我们也可以像更新查询一样,调用to change log stream,那它对应的changelo,那就全部都是加I,它的就都是追加嘛,都是插入。这样看的话,好像我们就可以总结一个规律,那是不是如果用到了聚合。在之前的结果上有叠加,那么就会产生更新操作,如果说没有聚合操作的话,那是不是我们所有得到的就全部都是。追加查询呢,就直接可以to stream里边所有的这个结果表里边就只有insert操作呢。但事实上。判定到底是追加查询还是更新查询的标准。并不是说。有没有用到聚合,而是说就是说看这个结果表里面的数据,它到底有没有更新操作。
15:07
诶,那可能我们就想到,那既然是做了聚合,怎么可能它没有更新操作呢?真有这样的情况,比如说窗口聚合。比方说我们还是呃考虑开一个统计窗口,这里可以简单举一个例子,就是在之前的这个E的事件基础上,用户点击访问的事件,我们可以统计每一个小时之内所有用户的点击次数,类似类似于一个PV的统计,那所以在当前的统计结果里边,我们可以做这样的一个定义,就是首先。要根据用户去进行划分,看每一个用户点击了多少次,所以包含一个username啊,User用户的名名称,然后呢,同样有一个访问的次数,CT这个都是一样的啊,跟之前是一样的,只不过之前我们是持续不断的去聚合,看当前用户到底访问了多少次,来一个就叠加一次,来一个就叠加一次,而现在呢,我们是要开个窗口。
16:08
所以这个窗当前结果里边就还应该包含一个窗口的信息,比如说我们当前就以窗口的结束时间叫NT作为当前窗口的一个标志,那这样的话得到的结果就应该是只在当前一小时这样一个窗口内,每一个用户访问的次数统计。那整体来看的话,跟之前的分组聚合整体过程是差不多的。我们会想到,当原始的动态表不停的插入新的数据的时候,我们可以看一下。不停的插入新的数据的时候,那得到的这个result表里边也会不停的更新当前每一个用户他访问的次数,比如说我们当前这张表啊,那我们统计一小时,那可以看到当前的时间戳后边我们为了看到清晰,专门后边乘以了6万,那如果当前这个时间戳是毫秒数的话,6万。
17:09
当然就是60秒啊,那所以就是一分钟了,这样看的就非常的明显,我们的第三条数据,这就是在第25分钟的时候插入的啊,那第四条数据是55分钟,所以很明显前四条数据。当前就是在从零到第一个小时之内的,如果我们这里表示的是小时数的啊话,那就是从0.00分到01:00之内的四条数据,而后边呢,呃,后边我们看到这是第61分钟,90分钟,110分钟,那这个都应该是在。1.00到。2.00之间的所有的数据。呃,我们前面这个是不包含一点整的这个数据的啊,这样的话我们就可以看到当前可以划分两个窗口,一小时的时间窗口,我们接下来可以看一下,在这张表上边,这本来还是一个原始的输入的动态表,那每一条数据都是插入进来的,都是简单的用户的一个访问事件,接下来我们应用的这个CQ跟之前就有所不同了。
18:20
之前我们直接就是by user,然后做了一个count统计,现在呢,诶,那就需要不仅要group by user,因为我们还要开窗口进行窗口聚合,在fliq里边,窗口聚合可以用这种方式,就是group by user,然后后边再加上窗口的信息,窗口的信息呢是Windows start和window and,这是。Flink CQ里边进行窗口聚合的一个固定写法啊,那就是把窗口的起始点和结束点都作为当前分组的K传输进来,然后另外这里面front table的时候呢?哎,那就不光是要有当前的even table,还应该要指定当前的时间戳到底是什么啊,所以我们这里边时间戳是TS,另外还要有当前窗口的定义,那当前是一个滚动一小时滚动窗口,它的定义是。
19:17
Tbo,我们看到tbo,这很明显这就是滚动窗口的含义了,里边传入了当前的事件表,输入的事件表以及时间属性字段,另外还有一个当前滚动窗口的大小,INTERVAL1HOUR啊,那很明显这就是长度为一小时的滚动窗口。那关于窗口的使用和定义,我们会在后边章节里边再去做详细的解释,这里的话我们只要知道用这种方式定义了一个滚动一小时的滚动窗口就可以了。然后我们要提取的字段呢,就是前面说的user,还有window,我们叫as andt,就是窗口的结束时间,就是用这个window关键字提取出来的。另外还有count URL CT,所以我们最后得到的result结果表也是一张动态表,里边就只有三个列,User MT和CT。
20:12
啊,我们这里边的MT不是直接写的时间戳,这里是写成了我们更加熟悉的十分秒的这种形式啊,所以这个看就更更加明确一点,就是一点钟结束的这个窗口,两点钟结束的这个窗口。所以我们现在看到了正常,我按照我们的思考方式的话,那应该是流处理嘛,所以当前输入的table,它是一条数据一条数据来的。Alice丝第一条数据来了,访问数据来了,那我们这里应该有一个Alice andt是一小时第一个小时,然后CNT应该是一,然后接下来Bob第二条数据来了啊,那么追加在这里边,后面追加一条BOB1小时一,接下来Alice呢,又来了两条数据,所以我们应该是更新这张表里的爱ice丝一小时,把一改成二,改成三,所以最后是这样的一个结果。
21:09
一小时的时候,我们应该能够得到这样一个结果。在这里我们还有一个具体的实现了窗口聚合的代码示例,那我们这个会放在下一节讲解时间属性和窗口的时候,再统一的实现这段代码。那后面呢,我们还需要再介绍一个,就是所谓的查询限制,这主要是考虑到在实际应用当中,有些持续查询,它可能会因为计算代价太高而受到限制。那什么叫做代价太高呢?那有两种情况,一种就是我们所说的状态太大。我们我们知道使用持续查询做流处理的时候,比方说像前面我们讲到的这个窗口进行分组聚合,那在这个过程当中,我们可能需要把中间的那些状态都保存下来啊,这个其实跟窗本身我们做窗口的操作是一样的,我们前面如果说只是做一个增量聚合的话,要保存的可能只是当前user对应的一个count值。
22:14
只是一个计数器就可以把它保存下来,然后增量聚合,那有些场景下,可能我们是要把当前窗口内的所有数据都存下来的,诶,那这个如果说我们开这个窗口啊。假如说持续查询要要保存几周甚至几个月的数据的话,那整个这个要处理的数据总量可能非常非常大。所以在这个过程当中,如果状态是逐渐增长,不停增大的话,那是有可能会耗尽我们的。内存空间的,这样就有可能会导致查询失败。那另外还有一种情况就是更新计算的复杂度有可能会很高。的更新计算的复杂度,那指的就是说每来一条新的数据,我们到底要改什么?像前面我们如果只是一个非常简单的,如果我们只是一个非常简单的count计算的话,哎,那很很简单吧,就是找到对应的那个user,每来一个数据我就加一,每来一个就加一,啊这个不需要更改太多,但是我们想另外一个例子,假如说是一个rank函数。
23:18
我们要更新当前数据的排名的话,那一条数据的改变就有可能导致他之前之后的所有数据的排序排名发生变化,诶那比比如说我们这里边有一个另外的一个CQ,我们当前的这个CQ是select user和rank啊,后面是用到了一个所谓的开窗函数over,这个也也是放在后边,我们讲到窗口的时候会专门的去做讲解,那我们现在只是知道相当于我们就是基于某一个范围进行一个开窗。然后接下来呢,是要。我们要选取当前这个范围内的max ts啊,就是最大的时间戳,而且接下来还要计算一个排名。
24:06
所以接下来最大时间戳我们知道其实就是在一段范围内的最后一次点击的时间嘛,我们根据这个用户最后一次点击时间做一个重新排序的计算,每来一个数据就要对所有的排序进行重调,那这个过程很明显啊,用户的数据越多,更新的难度就越大,这个耗费的代价是非常非常大。这样的查询操作就不太适合作为连续查询在流处理里边进行执行啊,那所以我们在实际应用的过程当中,也需要去考虑这样的两种情况,就是状态,如果要是在不停的增长,可能增长到非常大,或者说更新计算,随着数据的增大会越来越麻烦,会非常复杂的话,我们往往就要谨慎的去使用持续查询。这是关于。
25:01
开始的时候,把一个真正意义上的数据流转换成动态表,然后进行持续查询的过程。
我来说两句