00:01
接下来我们要介绍的是flink内存管理当中非常重要的一个概念,也是实际项目应用当中非常重要的一环,那就是状态的生存时间。也就是所谓的TTL,这个概念我们并不陌生,在很多工具或者数据库里边都有类似的概念,最典型的当然就是red了啊,Red作为一个内存数据库,它里边如果说我们对它没有任何的限制的话,当数据量越来越大啊,非常庞大。占据了所有的内存空间的时候,那很显然就会极大的影响我们的系统性能,那所以对于red而言,它是可以针对每一个去设置一个它的生存时间T的,超过这个时间当前就相当于失效了。在fli当中呢,其实也是非常的类似,因为我们知道flink的分布式架构的基本思路,也就是一个基于内存的计算,这样的话可以让我们的流失计算更快,更有效率。那当然这同样带来一个问题,就是很多状态那都会随着时间的推移逐渐增长,它是放在内存里的,那如果不加以限制的话,那最终内存空间就会耗尽。
01:21
啊,那所以怎么样去解决这个问题呢?一个最为简单的思路,那就是我们考虑在代码里边,只要当前的状态已经没用了,那就调用点clear方法去清除当前的状态,啊,这个我们在之前代码当中都已经做过类似的操作,我们应该养成这样的好习惯,就即使当前的状态不清理也不影响逻辑,那我们在它不用的时候也要把它清理掉。相当于是回收内存的一个工作,因为我们知道当前flink里边的状态管理并不完全依赖于JVM啊,所以你不能把它交给GVM去做,直接交给GM去做GC就完了,必须要自己手动的进行一个收回收和管理。
02:08
这是第一种最简单的方法,但是我们想到有时候啊,在代码里边,这个科尔可能不是随便都能去写的。因为你一旦写了这样的一个clear的话,那么每一次调用flaman,像我们这里的flaman,或者是process process function里面的process element的方法,每一次调用它都会执行这样的一个clear,这样的话可能对于我们的业务逻辑就不对了,所以往往这个是要随着某一个判断条件的。而有时候呢,这个判断条件可能又不是特别的清晰,不是特别的准确,我们并不知道一定是什么时候才能把它清空清除,那这个就判断就会非常的麻烦,代码设计也会增加很多难度,那怎么处理这种场景呢?所以这里面我们的思路就是可以配置一个状态的生存时间TT。
03:04
如果状态在内存里边,它存在的时间已经超过了这个TTL值的时候,不管它当前我们有没有调用点clear方法,可能我们根本就没有手动去clear它,但是这个时候只要超出了TT时间,就把它直接清除掉。那具体在底层实践上,我们应该怎么实现这个东西呢?呃,一个简单的想法是,既然它是时间嘛,那我们可以设一个定时器啊,就是当前这个状态已经创建,创建出来的时候啊,状态已经定义好的时候,我就设置一个定时器,然后到这个定时器的时间,不管三七二十一,我直接就调一个它的clear方法不就完了吗?这跟我们在代码里边去设一个定时器,然后去调clear其实是一样的。但是这种方法太简单粗暴,它对于很多场景是不适用的。我们简单思考一下就会发现,对于一个状态它是否失效,你不应该只是创建之后,然后直接就让它。
04:05
到一定时间就直接失效了,因为在代码当中有可能我们的状态会不停的不停的更改,不停的update。这个时候状态如果update之后,很显然当前它是一个活跃的状态,我们就不应该说它是因为一般情况我们设置这个生存时间啊T,其实是这一段时间内,这个完全空了,完全没有用,空闲一段时间之后,我们认为它没用了,才把它清除。那如果说在这段时间内,他还不停在更新,不停在用,很显然它应该继续保持下去。所以我们就想到了,那要这样的话,每一次出现状态的更新update操作的时候,我还应该就是把它的TT时间,它的真正生存要清除的那个时间再往后推移,以现在这个节点作为起始,再去加上TT时间,才是他要清除的时间。
05:04
啊,那这样看的话就稍微会麻烦一点,那难道说代码里面每一次更改我都重新。更新它的,呃,要清除的时间,然后我再去拿一个,拿一个专门的进程去扫描当前所有的状态,然后看它是否失效吗。这是一种思路,但是显然它的效性能是比较低下的,效率比较低,那这里面有一个很好的解决方案,就是我不需要随时去扫描,而是怎么样呢?在状态里边给它配置多一个属性。也就是说相当于我现在给状态附加了一个TT属性,那么。有了这个TTL属性之后,我就知道当前状态的失效时间到底是什么时候了。那每当我们创建一个状态的时候,它的失效时间就等于当前创建的时间加上T时间。
06:02
之后如果有对状态的访问和修改,对状态做了操作的时候,那这个时候我们就可以更改它的失效时间。同样也是以当前的时间再去加上TTL就可以了,什么时候真正的要去清除这个状态呢?因为我们知道超过这个状态的清除时间的时候,我们总还得去扫描一下,找到这个状态,判断它的失效时间已经达到了,这个时候我们才能够去做这个操作。那是不是我们应该去。定一个定时器呢,啊,其实不不需要,我们只要。在下一次要用到这个状态的时候,再去做判断,判断它是否超过就可以了啊,在这之前这个状态还可以保存在内存当中,因为它根本也没有用到吗?只有在用到的时候,诶,这个时候我再去做一个判断,这个时候就可以,如果超过十秒时间就可以把它清除掉。接下来我们可以在代码里面去做一个具体的配置,那在代码里面呢,最关键的其实是要创建一个所谓的state配置对象,它主要是要针对状态去做一些配置,把它配置好了之后,然后传递给对应的状态描述器。
07:17
调用状态描述器的enable time to方法,然后就可以启动状态的TT功能了。接下来我们可以在代码里面做一个具体的实现。这个我们直接可以放在之前讲过的state test这个代码里边去做一个定义就可以了,因为在这里边我们已经定义了很多个不同类型的状态,那接下来这个T的配置需要在哪里去做呢?很显然这是针对整个状态有效,当然就应该在open生命周期把这个状态控制句柄获取到的时候,诶,或者说是我们针对它要去定义这个value state script啊,对应的这个状态描述器的时候,这个时候我们可以把它做一个单独的配置。
08:06
因为后面还要用到这个配置当前的这个状态描述器,所以我现在可以稍微的更改一下这段代码。我们可以把。状态描述器单独抽出来。我们可以把这个就叫做一个啊value state script,然后下边单独的把value state script传进去就可以了,那接下来呢,我们可以以这个状态的描述器为例,然后看一看怎么样给当前的我们这个直状态my value state去进行TTL的配置。配置状态。TT。配置TTL的核心其实是要创建一个所谓的state t t啊,那所以这个创建的过程。
09:00
T,我们就把这个叫做。它的创建过程呢,其实也非常的简单,这是固定的啊,直接调用STEM这个。我们看到有一个new builder方法,诶,这个进去之后我们就会发现啊,他要创建的其实是一个builder,这个builder。是它的一个内部类,这个内部类有什么用呢。我们会看到下边它有一个build的方法,这个build方法真正会创建出一个state t TL big啊,那跟之前我们看到那种builder.build的那种调用其实是非常类似,这里也是一样啊。那原因自然就是当前。State TT这个类它的构造方法。我们可以看到它是私有的。所以对应的那些配置项,我们都应该在builder里边去进行配置,然后去调用它的build方法就可以实现,这样的话,整个的流程非常的明显,就是这样的一个配置的过程。
10:08
那这个new builder builder里边又应该传什么参数呢?我们看到它只有一个T参数,这个time就是一个TT,所谓的这一个TTL,当然就是我们定义的到底生存多长时间就效它的那个生存时间了,那这里我可以直接去定义一个time。比方说我们就直接time.hours啊,一小时失效,这就是一个非常典型的TTL的设置。当前这个状态在内存里边,如果超过一小时都没有使用,都没有更新,那么接下来我们就直接把它清清除掉就可以了。好,那这是基本的一个配置,那我们会发现啊,实际它的底层并不是说到了这个时间就直接清除的,它其实并不会直接清除,而是相当于有一个标记说,诶,它的清除时间已经到了,那接下来其实是到了。
11:06
真正下一次要用到它的时候,才会去判断我要不要把它清除,到底还能不能用啊,所以接下来呢,我们还有两个非常典型的对于这个builder啊,有两个非常典型的可以去配置的参数。我们看到set,一个叫做set update type,也就是说更新的类型,什么叫做更新类型呢?也就是说什么时候可以去更新状态的失效时间。诶,之前我们说当前如果是创建一个状态的时候,当然这个时候应该去设置,就是首先更新它的这个,相当于给一个初始值设置一下它的失效时间,那之后什么时候还能够去更改这个失效时间呢?我们自然能想到,如果要是做了修改操作、update操作的话,当前状态已经被写入了,重新写入了,那自然可以更改。
12:02
另外还有一个就是如果说我们做了访问操作,对状态做了读操作的时候。这个算不算更新可以更新它的失效时间呢?啊,在默认情况下这个是不可以的,也就是默认情况下只有创建状态和更改状态有写操作的时候才能更新失效时间。那所以这里面我们可以。传入不同的参数配置当前的up type来控制到底什么时候可以更新失效时间,那所以这里面要传入的东西啊,其实是一个。我们可以看到是一个state config这个类下边的。Update type这个类,那当然它下边就是一个枚举类型啊,我们可以看到update type它本身是一个枚举类型,里边一个叫做on create and write,另外一个叫做on read and write。
13:01
很明显,On create and write,就是说只有在创建和更改有写操作的时候才去更新当前的时效时间啊,那如果要是read and write的话,就是无论读写操作都去更新当前的时效时间啊。默认情况下,这里边其实是create and write,如果我们想要更改的话,可以把它更改成read and write。就是。所谓的更新类型,另外还有一个,前面我们也看到了,Set里边有一个非常明非常明确的一个设置,叫做set state visibility。这个配置是配置什么呢?状态的可见性。什么叫做状态的可见性?那其实就是说,因为我们当前的清除操作,它并不是立刻发生的,就到了一小时的时候,我们知道我并不会随时去扫描,也不会去设置定时器之类的东西,而是等到下一次要去访问或者说修改这个状态的时候,才去判断它的失效时间是否已经超过当前时间了。
14:10
诶,那对于这种情况,如果说我发现它已经超过失效时间的话,是直接把它清除掉呢,还是说假如说我当前是一个访问操作,当前的这个状态还可以拿出来继续用呢,因为它并没有被清掉呢。所以接下来我们要判断的就是这个特点,那当前能够配置的当然了,默认情况下自然是不能直接去做这样的一个,呃,输入自然是不能够直接去访问到的啊,所以我们这里可以看到。默认情况下啊,那其实应该是要返回一个state visibility这样的一个枚举练习,它里边也是有两个,一个叫做return。Expired if not cleaned up,另外一个叫做never return expire,也就是说你到底是如果说还没有清掉的话,我就直接把它返回,只就是假如失效了也没关系啊,只要还没清掉。
15:10
我现在如果要访问的话,就直接把它返回,另外一个是永远不要返回失效的数据。啊,那默认当然是永远不要返回失效数据了,如果我们想要还要拿到的话,可以把它配置成这样。这就是对于TTL的非常典型的两个配置项,这是最为常用的配置项,那有了这个配置之后,接下来我们就可以直接对于比方说的们当时定义的叫做state。把这个要用一个enable方法。然后当。这样的话,当前的描述器就有了对应的失效时间的配置,而且有了相关这些配置属性。那么接下来用这个描述器所描述的。
16:04
状态my value state。就有了对应的TT这样的特性,这样可以去使用。这里还需要注意的一点是,当前我们配置了一个一小时。之前我们说过,Flink里边时间是有不同的语义的,那当前这个一小时到底是。处理时间还是事件时间呢?啊,那可能我们自然会想到,哎,那就看你前面它本身定义的这个过程了嘛,那到底是事件时间的语义,呃,还是处理时间的语义不就可以了吗?需要注意啊,早期的一些版本里边,我们可以在整个环境里边全局的去设置当前的时间语义,当前的时间特性,但现在已经没有了,只有在后边用到的时候才会有时间特性的定义,而我们现在呢?用到的这一个时间,它只能是处理时间,目前的flink只支持处理时间与以下的TT设置,所以这里边我们的一小时,那就是系统时间,机器时间,过一小时之后自动的把它清理掉就可以了。
17:14
这就是关于TT的配置和使用。
我来说两句