00:00
接下来我们已经知道了flink里边的状态管理是什么样子的,而且呢,我们也会用,呃在在代码里边知道该怎么样去利用k state去进行状态编程啊,另外我们还学习了process function,知道在里边呢也可以利用状态,那呃讲了这么多,Flink底层我们已经知道他有有一整套的这个状态管理机制了,但是本身就是flink他自己是是是真的是就是把这个状态就是当成我们的这个,呃,内部的这个变量直接放在内存里面来做的管理吗?那如果说涉及到后边,我们还要讲到还还会有容错啊,如果说发生故障的时候,我们说它需要做一个存盘,然后从持久化的存储空间里边把它读取出来,然后再恢复状态,那这些操作看起来不像是我们,而且就好像跟我们平常执行任务应该是两回事,对吧?呃,所以说这部分内容我们其实在flink里边,它并不是。
01:00
让我们的任务来去做的处理,而是用了一个可插入的组件,专门来控制当前状态的存储、访问和维护,包括这个恢复啊,容错方面的机制,这个组件就叫做状态后端state back end,好,这个组件其实在我们平常应用的时候,大家可能呃就不会接触太多,但是呢,我们一般情况在生产环境里边肯定要去做一个配置,就是你要知道我们当前的所有的这个状态是存在哪里的,这是取决于这个状态后端的。啊,那所以这个状态后端主要是用来干什么的呢?它就干两件事儿,一件事是本地的状态管理,就是我们前面说的所有的那些状态啊,啊,不管是这个operator state还是kid state啊,不管是value state list state还是map state,还是这个reducing state,它到底是怎么样去做,呃,序列化,反序列化,对吧,到底是怎么样去呃做这个存储和访问的呢?哎,这是由状态后端来控制的。
02:06
像前面我们讲的这个状态编程这块用到的状态都归他来管理,另外还有一个事情归他管,那就是把这个状态保存成检查点checkpoint,然后把它写入到远程的存储空间里面去,也就是说做存盘,然后之后如果发生故障的时候,从远程的这个存储空间读出来,再把状态恢复,哎,这些事儿都是他来做的。所以之前我们讲的主要是它的这个关于本地状态啊,怎么样去定义,怎么样做状态编程,有哪些状态,主要是讲这一部分,然后怎么样去访问和存储,存储,那另外还有一部分是这个容错方面的checkpoint,我们放在后面来讲,那现在呢,大家先要搞清楚在flink里边到底有哪些不同的状态后端。这里边要给大家把这个概念提出来,就是在弗尼克里边主要有这样的三种选择,我们在代码里边,或者说我们在这个集群配置的时候,可以指定某一种,这三种里边首先一个是memory state杆。
03:14
从字面上就知道了,这是一个内存级别的状态后端啊,大家看这个内存级别状态后端就是什么呢?就是我们所有的那些k set啊,啊,所有的那些监控状态,就都作为这个内存里边的一个对象管理起来就完了啊,所以当前在做管理的时候,它会把它放到哪里去呢?就直接把它存在了task manager的JVM堆上啊,就平常我们这个正常来讲,管理的这个状态呢,一般情况我们说它是有一块单独的这个manager memory去做管理的,对吧?那这里边你如果要是说。用了这个内存级的状态后端的话,它是直接就把这个存到这个GM堆上了,而我们做那个拆po里的保存的时候,它存在哪呢?内存机动状态后端checkpoint也是直接存在内存里边,只不过是存到了drop manager的内存里边,存到了远程的内存里,对吧?啊,相当于有了一个这个远程的保存啊,所以它的特点非常的明显啊,就是既然都是内存级别的,当然很快了啊,读取写入都很快啊,另外就是保存这个拆point也很快,对吧,只要那个啊传输过去这个往内存里边写是非踌的,那延迟是比较低,但是稳定性就不够强啊,大家想到就是你你在做这个呃,操作的过程当中,如果说出现这个掉电对吧,有有这个任务挂了的话,那那直接这个这个所有的状态就都丢了吗?呃,你如果要是说我们这个本身任务的状态,这个还好说,因为本来。
04:50
不错性考虑的就是它有可能挂对吧?啊,但是你如果要是说这个drop manager挂了的话,它里边的这个内存里边的checkpoint都没有了,这个可能损失就会比较大,然后你没有办法再去恢复状态了嘛,另外就是说你如果要把这个状态都存在,呃,Task manager的GVM堆上的话,会受到GVMGC的影响啊,所有这些都会导致它不够稳定啊,那我们自然就会想到了,能不能有就是相当于把这个持久化做的更彻底一点的这种方式呢?我可以牺牲一点,就是访问的速度对吧,稍微慢一点,可但是我需要保证它更加稳定,能够容错性更强,那就有了第二种所谓的FS state back end啊,顾名思义FS是file system。
05:37
所以它的特点就是把什么存在system上呢?把就是我们说的状态进行存盘啊,保存之后放到远程的持久化的文件系统上,而对于本地的状态呢,它跟内存级别都是一样的,都是放在本地task manager的内存里面的啊,所以它的特点就是说,哎,我同时访问本地这个状态的时候,速度比较快,对吧,访问读取都比较快,那另外呃,就是如果说我发生故障的时候呢,至少是比较有保证的远程的文件系统吧,这个它是掉电也不会丢,我总能把它恢复出来的,这是比较,就是取了一个权衡的状态。
06:19
但是它同样也有问题啊,有什么问题,就是如果说我当前的这个系统特别特别庞大。庞大到什么程度呢?就是我当前的这个状态,在当前task manager的内存里边,你分配这个内存对吧,我们有lo去执行,呃,这个分配的这个内存去存状态的这块空间都根本就不够用了,你扩展都不够用了,对吧,内存这个状态特别多,这种场景怎么办呢?哎,那我们就会考虑到能不能用别的一些存储空间代替这里边本地的这个状态,内存状态的一个存储呢?啊可以的,这种叫做rocks DB stay back,也就是说用rocks DB进行我们所有状态的存储,那大家知道rocks DB其实是一个一种内嵌的这种k value6的存储介质啊,可以把它当成数据库k value数据库来来使用啊,但是它整体来讲就是访问速度会比较快,另外呢,它是可以落盘去去直接写在硬盘上的啊呃,它是持久化的一个这个存储系统,所以说我们它的这个序列化,反序列化,在这个读写操作的时候,你如果它已经落盘在硬盘上的话,那当然就需要序列化,反序列化这个操作是比较耗费访问性能的,它的性能会受到一些影响,但是你既然它可以就是扩展到硬盘上了,那当然整个这个。
07:47
这个就不存在内存大小的限制了啊,在实际使用的过程当中,如果要是你的那个数据量啊,状态特别大的话,用rocks DB基本上都不会溢出,都不会出现这个OM的情况啊,所以这就是在不同的场景下,大家可以选择不同的状态后端啊,那呃,这个。
08:09
就是关于这个具体生产环境里边的选择,大家注意一点就够了,就是memory state,它一般用在哪呢?生产环境肯定不会用,对吧?因为太不稳定了,所有东西都放在内存里,所以一般情况它是用在测试和我们这个调试的环境里面,开发环境里面,而这个生产环境里边呢,文件系统和这个RODB两者都有使用的场景,就是一般情况下我们那个集群启动的时候啊,默认配置文件里面配的是之前我们也看到它是文件系统对吧?File system啊,那如果说我们认为这个当前的内,呃,就是内存放不下啊,内存比较小,然后状态又比较多啊,那这个时候你最好是把它配成RODB,那如果说这个你对这个本身当前的访问速度要求非常快的话,你要求这个性能非常高的话,那就没什么好好办法了,你就配置成file system,直接back end,然后你再扩展内存不就完了吗?啊,你就是加机器加钱嘛。
09:09
就是这样的一个处理方式,好,那这是关于这个选择状态后端的这个这个理论部分啊,那在实际的操作过程当中,其实我们是可以在代码里边给当前的这个flink的流失处理程序,直接去做状态后端的配置的啊这个怎么去配呢?也非常简单,就是环境吧,这肯定又是跟环境相关,对吧?啊,你看我可以直接有一个set,大家看到了啊,就在这个set时间特性下边有一个state back。Set state back,然后这里边我们所指定的就是传进来一个stay back end就可以了啊,你可以去比方说我们用一个,呃,当前的这个memory。Memory怎么memory对吧?State second,你可以直接定义一个这样的state memory state back啊,但这个其实开发环境里边并不需要这么定义对吧?因为默认就是这样的嘛,那或者我们可以定义一个FS对吧?File system system back end啊,你去正确定义,然后里边当然就是需要给一个这个文件路径了,对吧?呃,你如果要是说这个往往HDFS上去放的话也是可以的,你给一个APIHDFS的一个路径就可以啊,所以这种方式还是比较常见啊,比较常用,呃,然后在这种方法里边需要给大家说的是它后边啊大家会看到啊,这个这个已经这种方式已经被就是当前已要要被弃用了,对吧,以后的话是不要直接基于这个就是env点,而是基于什么呢?静态方法对吧?调调这里边stream execution environment,直接调它的set step back and。
10:54
啊,但是现在应该是还还不能直接这么用啊,大家还是直接env去做的配置,然后这里边我们size的过程当中,其实呃,就是这里边啊给这个。
11:08
我们这里边要看的是。这个啊,呃,就是这个呃,File system stay back end,在传这个它的构造方法参数的时候啊,除了直接传一个路径啊,你看这里边这个checkpoint in data UI UI对吧,除了传一个这个之外呢,另外还可以传啊,比方说我们这个size state size的一个呃,Thhold的一个阈值对吧,到底多大还可以传这样的一个参数,那呃,这里边就是有有不同的这种呃重写的这个参数的方法了,大家都可以看一看啊,就是如果实际在调用的时候,可以有哪些传参的方式,可以传一个string,可以传一个uri对吧,可以传一个pass,这些都是正确的传餐方式,然后另外我们可能就想到了,那如果要是配这个,呃,所谓的这个rocks DB,那应该怎么配呢?啊,那有同学可能想到,那就是你用一个rocks DB state back吗?呃,大家看到这里面没有对吧,这里边只有一个SDB,什么compact filter之类的什么strategy,对吧?啊,并没有它的那个state back。
12:11
但是那这个东西怎么办呢?啊,引入另外的包嘛,缺包了,所以官方flink官方啊是给我们有这样的stay的支持的,你需要引入flink stay back end rock DB,把这个引入后边是scale的版本,然后下边是当前啊,就是flink的版本,直接把它放在po文件里边就OK了。放在这儿。好,然后引入这个之后,接下来我们再到代码里边来看,好,现在我们看一下当前,如果再去敲这个rocks DB,哎,现在东西就多了,对吧,我前面少了一个DB,它也能自动匹配到rocks DB stand back,然后里边同样,哎这里边还是要给对应的一个路径,对吧?哎,这里边你看到它这里边是这个set,这里边的时候会发现这个好像有问题是吧?呃,就是我们这里边的这个rock DB stay back啊,本身它应该也是一个抽象的一个stay back end,然后我们在去给它传参的时候呢,你可以传一个这个就是当前也是啊,一个一个路径啊,一个uri,那另外这里边还有一个比较特殊的参数是可以传一个什么呢?Enable incremental checkpoint,这个是表示增量化的,这个checkpoint意思是什么呢?因为大家知道一个checkpoint相当于做一个存盘嘛。
13:38
存盘的时候,它是要把所有的状态都存下来的啊,啊就是那有些情况我们说那个状态特别大对吧,然后拆又是它是自动存盘嘛,隔一段时间就存一下,隔一段时间就存一下,那你每次都把全量的那个数据重新存一份,肯定就会这个性能,性能的这个损耗就比较大,对吧,数据量比较大,那在这种情况下呢,我就允许它做一个增量式的存储,就是基于上一次保存存盘的那个拆point,然后呢,在它基础上把改变的那个那些内容我再做一个保存就可以了。
14:15
啊,当然你如果开启了这种方式的话,那就要求之前的拆油炮你都不能丢,对吧,每一个拆油炮都不能丢,你这样连起来才能把它合成一个完整的状态啊,所以这是关于这个rocks DB的一个配置的过程啊。哦,当然这里边大家看到,就是你在做这个设置的时候,它还在报错对吧?哎,这里边这个报错的时候,我们看一下这个RODB本身,呃,你如果调用这个rock DB stay back这里边可以,呃,它已经实现了我们这里边需要的这些东西了啊,我们看一下这里边报错是报在哪里?好,这里边大家看到,呃,就是这里边可能调用实现的这一个abstract state back end啊,啊,这个可能是有一个版本的冲突,所以说导致了我们这里边稍微有一点问题,这里边再报错啊,大家只要知道这个东西应该怎么用就可以了,就是后续版本正常来讲的话,肯定是没有这样的一个问题的啊,你像之前这个一点十点零和1.9.2,这个正常来讲都是直接这么引入set stay end,然后传进来就没事了,现在这个十点一点十点一好像是有一点问题。
15:29
呃,确实也有可能是我这边引包的时候稍微有点冲突,大家下来之后再再试一下,再测一下吧。
我来说两句